...
In accordance with KStreams DSL Grammar, we introduce the following new elements:
split
DSLOperationKBranchedStream
BranchedKStream
DSLObject with following DSLOperations:branch
defaultBranch
noDefaultBranch
Branched
DSLParameter.
...
1. The split(Named named)
operation returns KBranchedStream
BranchedKStream
. Named parameter is needed so one can name the branch operator itself, and then all the branches might get index-suffixed names built from the branch operator name.
The overloaded parameterless alternative split()
is also available.
2. KBranchedStream
BranchedKStream
has the following methods:
KBranchedStream BranchedKStream branch(Predicate<K,V> predicate, Branched<K,V> branched)
-- creates a branch for messages that match the predicate and returnsthis
in order to facilitate method chaining.Map<String, KStream> defaultBranch(Branched<K,V> branched)
-- creates a default branch (for messages not intercepted by other branches) and returns the dictionary of named KStreams.Map<String, KStream> noDefaultBranch()
-- returns the dictionary of named KStreams.
...
There is also a case when one might need to create branches dynamically, e. g. one per enum value. This can be implemented the following way:
Code Block | ||
---|---|---|
| ||
KBranchedStreamBranchedKStream branched = stream.split(); for (RecordType recordType : RecordType.values()) branched.branch((k, v) -> v.getRecType() == recordType, Branched.withJavaConsumer(recordType::processRecords)); |
This is why 'starting' split()
operation is necessary and it is better to have it rather than add new `branch` branch
method to `KStream` KStream
directly.
Otherwise we should treat the first iteration separately, and the code for dynamic branching becomes cluttered:
Code Block | ||
---|---|---|
| ||
RecordType[] recordTypes = RecordType.values(); if (recordTypes.length =!= 0) return; KBranchedStream{ BranchedKStream branched = stream. branch((k, v) -> v.getRecType() == recordTypes[0], Branched.withConsumerwithJavaConsumer(recordType::processRecords)); for (int i = 1; i < recordTypes.length; i++) branched.branch((k, v) -> v.getRecType() == recordType, Branched.withJavaConsumer(recordType::processRecords)); } |
Proposed Changes
Add the new KBranchedStreamBranchedKStream class and branch() method for KStream (see https://github.com/apache/kafka/pull/6512).
...