Table of Contents |
---|
Status
Current state: Under Accepted
Discussion thread: here
Discussion Voting thread: here
JIRA: Jira server ASF JIRA columns key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-5488
Jira server ASF JIRA columns key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-8296
Pull request: PR-9107
Motivation
KStream.branch
KStream#branch
method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').
This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.
...
as(String name)
-- sets the name of the branch (auto-generated by default, whensplit
operation is named, then the names are index-suffixed).withwithFunction(Function<? super KStream<? super K, ? super KStream<K, V>, ? extends KStream<? extends K, ? extends KStream<K, V>> chain)
-- — sets an operation with a given branch. By default, it is ans->s
identity function. Can be complex, likes->s.mapValues....
, a composition of functions etc.withwithConsumer(Function<Consumer<? super KStream<K, V>> chain)
— sets a consumer for a given branch.withFunction(Function< KStream<? super KKStream<K, ? super V>, ? extends KStream<? extends K, ? extends KStream<K, V>> chain, String name)
— sets both an operation and a name.withConsumer(Consumer<? super KStream<K, V>> chain, String name)
-- — sets both operation a consumer and a name.
The Map returned by defaultBranch
/noDefaultBranch
allows us to collect all the KStream branch objects in a single scope. The branches collected are the results of transformations defined by `withChain` functions.
How the resulting Map is formed
The keys of the Map entries are defined by the following rules:
- If
Named
parameter was provided forsplit
, its value is used as a prefix for each key. By default, no prefix is used - If a
name
is provided for thebranch
, its value is appended to the prefix to form the Map key - If a
name
is not provided for the branch, then the key defaults to prefix + position of the branch as a decimal number, starting from "1" - If a
name
is not provided for thedefaultBranch
call, then the key defaults to prefix + "0"
The values of the Map entries are formed as following:
- If no chain function or consumer is provided, then the value is the branch itself (which is equivalent to
ks→ks
identity chain function) - If a chain function is provided and returns a non-null value for a given branch, then the value is the result returned by this function
- If a chain function returns
null
for a given branch, then the respective entry is not put to the map - If a consumer is provided for a given branch, then the the respective entry is not put to the map
For example:
Code Block | ||
---|---|---|
| ||
var result =
source.split(Named.as("foo-"))
.branch(predicate1, Branched.as("bar")) // "foo-bar"
.branch(predicate2, Branched.with(ks->ks.to("A")) // no entry: a Consumer is provided
.branch(predicate3, Branched.with(ks->null)) // no entry: chain function returns null
.branch(predicate4) // "foo-4": name defaults to the branch position
.defaultBranch() // "foo-0": "0" is the default name for the default branch |
Usage Examples
The following section demonstrates some standard use cases for the proposed API
Simple Example: Direct Branch Consuming
In many cases we do not need to have a single scope for all the branches, each branch being processed completely independently from others. Then we can use 'consuming' lambdas or method references in Branched
parameter:
Code Block | ||
---|---|---|
| ||
source.split() .branch((key, value) -> value.contains("A"), Branched.withJavaConsumerwith(ks->ks.to("A"))) .branch((key, value) -> value.contains("B"), Branched.withJavaConsumerwith(ks->ks.to("B"))) .defaultBranch(Branched.withJavaConsumerwith(ks->ks.to("C"))); |
More Complex Example: Merging Branches
In other cases we want to combine branches again after splitting. The map returned by defaultBranch
/noDefaultBranch
methods provides access to the branches in the same scope:
Code Block | ||
---|---|---|
| ||
Map<String, KStream<String, String>> branches = source.split() .branch((key, value) -> value == null, Branched.withName("null").withChain(with(s->s.mapValues(v->"NULL"), "null") .defaultBranch( Branched.withNameas("non-null")); branches.get("non-null") .merge(branches.get("null")); |
...
Code Block | ||
---|---|---|
| ||
BranchedKStream branched = stream.split(); for (RecordType recordType : RecordType.values()) branched.branch((k, v) -> v.getRecType() == recordType, Branched.withJavaConsumerwith(recordType::processRecords)); |
...
Code Block | ||
---|---|---|
| ||
RecordType[] recordTypes = RecordType.values(); if (recordTypes.length != 0) { BranchedKStream branched = stream. branch((k, v) -> v.getRecType() == recordTypes[0], Branched.withJavaConsumerwith(recordType::processRecords)); for (int i = 1; i < recordTypes.length; i++) branched.branch((k, v) -> v.getRecType() == recordTypes[i], Branched.withJavaConsumerwith(recordType::processRecords)); } |
...
Code Block | ||
---|---|---|
| ||
BranchedKStream<K,V> split(); BranchedKStream<K,V> split(Named n); |
2. Deprecate the existing branch
existing KStream#branch
method.
3. Add and implement the following Branched class:
Code Block | ||
---|---|---|
| ||
class Branched<K, V> implements Named<Branched<K,V>> { static Branched<K, V> withNameas(String name); static Branched<K, V> withFunction(Function<? super KStream<K, V> withChain, ? extends KStream<K, V>> chain); static Branched<K, V> withConsumer(Consumer<? super KStream<K, V>> chain); static Branched<K, V> withFunction(Function<? super KStream<K, V>, ? extends KStream<K, V>> chain, String name); static Branched<K, V> withJavaConsumerwithConsumer(Consumer<? super KStream<K, V>> consumer chain, String name); } |
Add and implement the following BranchedKStream interface:
...