THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
as(String name)
-- sets the name of the branch (auto-generated by default, whensplit
operation is named, then the names are index-suffixed).with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain)
-- — sets an operation with a given branch. By default, it is ans->s
identity function. Can be complex, likes->s.mapValues....
, a composition of functions etc.with(Consumer<? super KStream<? super K, ? super V>> chain)
— sets a consumer for a given branch.with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain, String name)
-- — sets both an operation and a name.with(Consumer<? super KStream<? super K, ? super V>> chain, String name)
— sets both a consumer and a name.
The Map returned by defaultBranch
/noDefaultBranch
allows us to collect all the KStream branch objects in a single scope. The branches collected are the results of transformations defined by `chain` functions. If a function returns `null`, its result is omitted.
How the resulting Map is formed
The keys of the Map entries are defined by the following rules:
- If
Named
parameter was provided forsplit
, its value is used as a prefix for each key. By default, no prefix is used - If a
name
is provided for thebranch
, its value is appended to the prefix to form the Map key - If a
name
is not provided for the branch, then the key defaults to prefix + position of the branch as a decimal number, starting from "1" - If a
name
is not provided for thedefaultBranch
call, then the key defaults to prefix + "0"
The values of the Map entries are formed as following:
- If no chain function or consumer is provided, then the value is the branch itself (which is equivalent to
ks→ks
identity chain function) - If a chain function is provided and returns a non-null value for a given branch, then the value is the result returned by this function
- If a chain function returns
null
for a given branch, then the respective entry is not put to the map - If a consumer is provided for a given branch, then the the respective entry is not put to the map
For example:
Code Block | ||
---|---|---|
| ||
var result =
source.split(Named.as("foo-"))
.branch(predicate1, Branched.as("bar")) // "foo-bar"
.branch(predicate2, Branched.with(ks->ks.to("A")) // no entry: a Consumer is provided
.branch(predicate3, Branched.with(ks->null)) // no entry: chain function returns null
.branch(predicate4) // "foo-4": name defaults to the branch position
.defaultBranch() // "foo-0": "0" is the default name for the default branch |
Usage Examples
The following section demonstrates some standard use cases for the proposed API
Simple Example: Direct Branch Consuming
...
Code Block | ||
---|---|---|
| ||
source.split() .branch((key, value) -> value.contains("A"), Branched.with(ks->{ks>ks.to("A"); return ks;})) .branch((key, value) -> value.contains("B"), Branched.with(ks->{ks>ks.to("B"); return ks;})) .defaultBranch(Branched.withJavaConsumerwith(ks->{ks>ks.to("C"); return ks;})); |
More Complex Example: Merging Branches
...
Code Block | ||
---|---|---|
| ||
class Branched<K, V> implements Named<Branched<K,V>> { static Branched<K, V> as(String name); static Branched<K, V> with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain); static Branched<K, V> with(Consumer<? super KStream<? super K, ? super V>> chain); static Branched<K, V> with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain, String name); static Branched<K, V> with(Consumer<? super KStream<? super K, ? super V>> chain, String name); } |
...