Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • withChain(Function<KStream<K, V>, KStream<K, V>> chain) -- a chain of operations with a given branch. By default, it is an s->s identity function. Can be complex, as like s->s.mapValues...., some composition of functions etc.
  • withName(String name) -- sets the name of the branch (auto-generated by default, when split operation is named, then the names are index-suffixed).
  • withConsumerwithJavaConsumer(Consumer<? super KStream<K, V>> consumer) -- directs the final result of chain to a given consumerjava.util.function.Consumer. (The method is called withJavaConsumer in order not to confuse it with Kafka Consumer interface). By default, a NO-OP consumer is provided.

...

Code Block
languagejava
source.split()
.branch((key, value) -> value.contains("A"), Branched.withConsumerwithJavaConsumer(ks->ks.to("A")))
.branch((key, value) -> value.contains("B"), Branched.withConsumerwithJavaConsumer(ks->ks.to("B")))
.defaultBranch(Branched.withConsumerwithJavaConsumer(ks->ks.to("C")));

More Complex Example: Merging Branches

Code Block
languagejava
Map<String, KStream<String, String>> branches = source.split()
  .branch((key, value) -> value == null, Branched.withName("null").withChain(s-s>s.mapValues(v->"NULL"))
  .defaultBranch(Branched.withName("non-null"));
branches.get("non-null").merge(branches.get("null"));

...

Code Block
languagejava
KBranchedStream branched = stream.split();
for (RecordType recordType : RecordType.values())
  branched.branch((k, v) -> v.getRecType() == recordType,
    Branched.withConsumerwithJavaConsumer(recordType::processRecords));


This is why 'starting' `splitsplit()` operation is necessary and it is better to have it rather than add new `branch` method to `KStream` directly. Otherwise we should treat the first iteration separately, and the code for dynamic branching becomes cluttered:

Code Block
languagejava
RecordType[] recordTypes = RecordType.values();
if (recordTypes.length == 0) return;

KBranchedStream branched = stream.
  branch((k, v) -> v.getRecType() == recordTypes[0],
     Branched.withConsumer(recordType::processRecords));

for (int i = 1; i < recordTypes.length; i++) 
  branched.branch((k, v) -> v.getRecType() == recordType,
    Branched.withJavaConsumer(recordType::processRecords));

Proposed Changes


Add the new KBranchedStream  class  and branch() method for KStream (see https://github.com/apache/kafka/pull/6512).

...