...
Jira server ASF JIRA columns key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-8296
Pull request: PR-9107
Motivation
KStream#branch
method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').
This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.
...
as(String name)
-- sets the name of the branch (auto-generated by default, whensplit
operation is named, then the names are index-suffixed).withwithFunction(Function<? super KStream<? super K, ? super KStream<K, V>, ? extends KStream<? extends K, ? extends KStream<K, V>> chain)
— sets an operation with a given branch. By default, it is ans->s
identity function. Can be complex, likes->s.mapValues....
, a composition of functions etc.withwithConsumer(Consumer<? super KStream<? super K, ? super KStream<K, V>> chain)
— sets a consumer for a given branch.withwithFunction(Function<? super KStream<? super K, ? super KStream<K, V>, ? extends KStream<? extends K, ? extends KStream<K, V>> chain, String name)
— sets both an operation and a name.withwithConsumer(Consumer<? super KStream<? super K, ? super KStream<K, V>> chain, String name)
— sets both a consumer and a name.
...
Code Block | ||
---|---|---|
| ||
class Branched<K, V> implements Named<Branched<K,V>> { static Branched<K, V> as(String name); static Branched<K, V> withwithFunction(Function<? super KStream<? super KKStream<K, ? super V>, ? extends KStream<? extends K, ? extendsKStream<K, V>> chain); static Branched<K, V> withwithConsumer(Consumer<? super KStream<? super K, ? superKStream<K, V>> chain); static Branched<K, V> withwithFunction(Function<? super KStream<? super KKStream<K, ? super V>, ? extends KStream<? extends KKStream<K, ? extends V>> chain, String name); static Branched<K, V> withwithConsumer(Consumer<? super KStream<? super K, ? superKStream<K, V>> chain, String name); } |
...