Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • as(String name) -- sets the name of the branch (auto-generated by default, when split operation is named, then the names are index-suffixed).
  • with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain) -- sets an operation with a given branch. By default, it is an s->s identity function. Can be complex, like s->s.mapValues...., a composition of functions etc.
  • with(Consumer<? super KStream<? super K, ? super V>> chain) — sets a consumer for a given branch.
  • with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain, String name) -- sets both an operation and a name.
  • with(Consumer<? super KStream<? super K, ? super V>> chain, String name) — sets both a consumer and a name.

The Map returned by defaultBranch/noDefaultBranch allows us to collect all the KStream branch objects in a single scope. The branches collected are the results of transformations defined by `chain` functions. If a function returns `null`, its result is omitted.

How the resulting Map is formed

The keys of the Map entries are defined by the following rules:

  • If Named parameter was provided for split , its value is used as a prefix for each key. By default, no prefix is used
  • If a name is provided for the branch, its value is appended to the prefix to form the Map key
  • If a name is not provided for the branch, then the key defaults to prefix + position of the branch as a decimal number, starting from "1"
  • If a name is not provided for the defaultBranch call, then the key defaults to prefix + "0"

The values of the Map entries are formed as following:

  • If no chain function or consumer is provided, then the value is the branch itself (which is equivalent to ks→ks identity chain function)
  • If a chain function is provided and returns a non-null value for a given branch, then the value is the result returned by this function
  • If a chain function returns null for a given branch, then the respective entry is not put to the map
  • If a consumer is provided for a given branch, then the the respective entry is not put to the map

For example:

Code Block
languagejava
var result = 
   source.split(Named.as("foo-"))
  .branch(predicate1, Branched.as("bar"))            // "foo-bar"
  .branch(predicate2, Branched.with(ks->ks.to("A"))  // no entry: a Consumer is provided
  .branch(predicate3, Branched.with(ks->null))       // no entry: chain function returns null
  .branch(predicate4)                                // "foo-4": name defaults to the branch position
  .defaultBranch()                                   // "foo-0": "0" is the default name for the default branch

Usage Examples

The following section demonstrates some standard use cases for the proposed API

Simple Example: Direct Branch Consuming

...

Code Block
languagejava
source.split()
.branch((key, value) -> value.contains("A"), Branched.with(ks->{ks>ks.to("A"); return ks;}))
.branch((key, value) -> value.contains("B"), Branched.with(ks->{ks>ks.to("B"); return ks;}))
.defaultBranch(Branched.withJavaConsumerwith(ks->{ks>ks.to("C"); return ks;}));

More Complex Example: Merging Branches

...

Code Block
languagejava
class Branched<K, V> implements Named<Branched<K,V>> {
    static Branched<K, V> as(String name);
    static Branched<K, V> with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain);
    static Branched<K, V> with(Consumer<? super KStream<? super K, ? super V>> chain);
    static Branched<K, V> with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain, String name);
    static Branched<K, V> with(Consumer<? super KStream<? super K, ? super V>> chain, String name);
}

...