Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: updated method signatures

Table of Contents

Status

Current state: Voting in ProgressAccepted

Discussion thread: here

Voting thread: here

...

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8296

Pull request: PR-9107

Motivation

KStream#branch method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').

This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.

...

  • as(String name) -- sets the name of the branch (auto-generated by default, when split operation is named, then the names are index-suffixed).
  • withwithFunction(Function<? super KStream<? super K, ? super KStream<K, V>, ? extends KStream<? extends K, ? extends KStream<K, V>> chain) — sets an operation with a given branch. By default, it is an s->s identity function. Can be complex, like s->s.mapValues...., a composition of functions etc.
  • withwithConsumer(Consumer<? super KStream<? super K, ? super KStream<K, V>> chain) — sets a consumer for a given branch.
  • withwithFunction(Function<? super KStream<? super K, ? super KStream<K, V>, ? extends KStream<? extends K, ? extends KStream<K, V>> chain, String name) — sets both an operation and a name.
  • withwithConsumer(Consumer<? super KStream<? super K, ? super KStream<K, V>> chain, String name) — sets both a consumer and a name.

...

Code Block
languagejava
class Branched<K, V> implements Named<Branched<K,V>> {
    static Branched<K, V> as(String name);
    static Branched<K, V> withwithFunction(Function<? super KStream<? super KKStream<K, ? super V>, ? extends KStream<? extends K, ? extendsKStream<K, V>> chain);
    static Branched<K, V> withwithConsumer(Consumer<? super KStream<? super K, ? superKStream<K, V>> chain);
    static Branched<K, V> withwithFunction(Function<? super KStream<? super K, ? super KStream<K, V>, ? extends KStream<? extends K, ? extends KStream<K, V>> chain, String name);
    static Branched<K, V> withwithConsumer(Consumer<? super KStream<? super K, ? superKStream<K, V>> chain, String name);
}

...