...
- In general, the code have poor cohesion: we need to define predicates in one place, and respective stream processors in another place of code. In case of change we must remember to edit two pieces of code.
- If the number of predicates is predefined, this method forces us to use 'magic numbers' to extract the right branch from the result (see examples here).
- If we need to build branches dynamically (e. g. one branch per enum value) we inevitably have to deal with 'generic arrays' and 'unchecked typecasts'.
Public Interfaces
The proposed new org.apache.kafka.streams.kstream.KafkaStreamsBrancher
classs introduces new standard way to build branches on top of KStream.
In accordance with KStreams DSL Grammar, we introduce the following new elements:
split
DSLOperationKBranchedStream
DSLObject with following DSLOperations:branch
defaultBranch
noDefaultBranch
Branched
DSLParameter.
Description
1. The split(Named named)
operation returns KBranchedStream
. Named parameter is needed so one can name the branch operator itself, and then all the branches might get index-suffixed names built from the branch operator name.
The overloaded parameterless alternative `split()` is also available (in this case, an auto-generated name is used).
2. KBranchedStream
has the following methods:
KBranchedStream branch(Predicate<K,V> predicate, Branched<K,V> branched)
-- creates a branch for messages that and returns `this` in order to facilitate method chaining.Map<String, KStream> defaultBranch(Branched<K,V> branched)
-- creates a default branch (for messages not intercepted by other branches) and returns the dictionary of named KStreams.Map<String, KStream> noDefaultBranch()
-- returns the dictionary of named KStreams.
Both branch
and defaultBranch
operations also have overloaded parameterless alternatives.
3. Branched
parameter extends NamedOperation
and has the following builder methods:
withChain(Function<KStream<K, V>, KStream<K, V>> chain)
-- a chain of operations with a given branch. By default, it is ans->s
identity function. Can be complex, ass->s.mapValues....
etc.withName(String name)
-- sets the name of the branch (auto-generated by default, whensplit
operation is named, then the names are index-suffixed).withConsumer(Consumer<? super KStream<K, V>> consumer)
-- directs the final result of chain to a given consumer. By default, a NO-OP consumer is provided.
The Map returned by defaultBranch
/noDefaultBranch
allows us to collect all the KStream branch objects in single scope. The branches collected are the results of transformations defined by `withChain` functions.
Simple Example: Direct Branch Consuming
Code Block | ||
---|---|---|
| ||
source.split()
.branch((key, value) -> value.contains("A"), Branched.withConsumer(ks->ks.to("A")))
.branch((key, value) -> value.contains("B"), Branched.withConsumer(ks->ks.to("B")))
.defaultBranch(Branched.withConsumer(ks->ks.to("C"))); |
More Complex Example: Merging Branches
Code Block | ||
---|---|---|
| ||
Map<String, KStream<String, String>> branches = source.split()
.branch((key, value) -> value == null, Branched.withName("null").withChain(s-s.mapValues(v->"NULL"))
.defaultBranch(Branched.withName("non-null"));
branches.get("non-null").merge(branches.get("null")); |
Dynamic Branching
There is also a case when one might need to create branches dynamically, e. g. one per enum value. This can be implemented the following way:
Code Block | ||
---|---|---|
| ||
KBranchedStream branched = stream.split();
for (RecordType recordType : RecordType.values())
branched.branch((k, v) -> v.getRecType() == recordType,
Branched.withConsumer(recordType::processRecords)); |
This is why 'starting' `split()` operation is necessary and it is better to have it rather than add new `branch` method to `KStream` directly.
Proposed Changes
Add the new KBranchedStream class and branch() method for KStream (see https://github.com/apache/kafka/pull/6512).
Compatibility, Deprecation, and Migration Plan
The proposed change is backwards compatible. The old KStreams#branch method should be deprecated.
Rejected Alternatives
- A KStreamsBrancher class that works the same way, but does not require KStream interface modification:
Code Block | ||
---|---|---|
| ||
new KafkaStreamsBrancher<String, String>()
.branch( |
...
Code Block | ||
---|---|---|
| ||
KStream<String, String> source_o365_user_activity = builder.stream("source"); KStream<String, String>[] branches = source_o365_user_activity.branch( (key, value) -> value.contains("A"), ks->ks.to("A")) .branch((key, value) -> value.contains("B"), ks->ks.to("B")) //default branch (key, value) -> true ); branches[0]should not necessarily be defined in the end! .defaultBranch(ks->ks.to("AC"); branches[1].to("B"); branches[2].to("C"); |
...
)
.onTopOf(builder.stream("source")); |
Rejected because of validation of method-chaining.
2.
Code Block | ||
---|---|---|
| ||
source .branchsplit() .branch((key, value) -> value.contains("A"), ks->ks.to("A")) .branch((key, value) -> value.contains("B"), ks->ks.to("B")) .defaultBranch(ks->ks.to("C")); |
Here the new KStream#branch() method returns KBranchedStream<K, V> object, which, in turn, contains `branch` and `defaultBranch` methods. This is critical that KStream consumers in .branch methods should be invoked immediately during the `branch` methods invocation. This is necessary for the case when we need to gather the streams that were defined in separate scopes back into one scope using auxiliary object:
Code Block | ||
---|---|---|
| ||
@Setter class CouponIssuer{ private KStream<....> coffePurchases; private KStream<....> electronicsPurchases; KStream<...> coupons(){ return coffePurchases.join(electronicsPurchases...)... } } CouponIssuer couponIssuer = new CouponIssuer(); transactionStream.branch() .branch(predicate1, couponIssuer::setCoffePurchases) .branch(predicate2, couponIssuer::setElectronicsPurchases); KStream<..> coupons = couponIssuer.coupons(); |
Proposed Changes
Add the new KBranchedStream class and branch() method for KStream (see https://github.com/apache/kafka/pull/6512).
Compatibility, Deprecation, and Migration Plan
...
Rejected Alternatives
Add KStreamsBrancher class that works the same way, but does not require KStream interface modification:
...
language | java |
---|
...
This was rejected because of the difficulty of having branches in the same scope.