Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In accordance with KStreams DSL Grammar, we introduce the following new elements:

  • split DSLOperation
  • KBranchedStream BranchedKStreamDSLObject with following DSLOperations:
    • branch
    • defaultBranch
    • noDefaultBranch
  • Branched DSLParameter.

...

1. The split(Named named) operation returns KBranchedStream BranchedKStream. Named parameter is needed so one can name the branch operator itself, and then all the branches might get index-suffixed names built from the branch operator name.

The overloaded parameterless alternative split() is also available.

2. KBranchedStream BranchedKStream has the following methods:

  • KBranchedStream BranchedKStream branch(Predicate<K,V> predicate, Branched<K,V> branched) -- creates a branch for messages that match the predicate and returns this in order to facilitate method chaining.
  • Map<String, KStream> defaultBranch(Branched<K,V> branched) -- creates a default branch (for messages not intercepted by other branches) and returns the dictionary of named KStreams.
  • Map<String, KStream> noDefaultBranch() -- returns the dictionary of named KStreams.

...

There is also a case when one might need to create branches dynamically, e. g. one per enum value. This can be implemented the following way:

Code Block
languagejava
KBranchedStreamBranchedKStream branched = stream.split();
for (RecordType recordType : RecordType.values())
  branched.branch((k, v) -> v.getRecType() == recordType,
    Branched.withJavaConsumer(recordType::processRecords));


This is why 'starting' split() operation is necessary and it is better to have it rather than add new `branch` branch method to `KStream` KStream directly.

Otherwise we should treat the first iteration separately, and the code for dynamic branching becomes cluttered:

Code Block
languagejava
RecordType[] recordTypes = RecordType.values();
if (recordTypes.length =!= 0) return;

KBranchedStream{
  BranchedKStream branched = stream.
    branch((k, v) -> v.getRecType() == recordTypes[0],
       Branched.withConsumerwithJavaConsumer(recordType::processRecords));

  for (int i = 1; i < recordTypes.length; i++) 
    branched.branch((k, v) -> v.getRecType() == recordType,
      Branched.withJavaConsumer(recordType::processRecords));
}

Proposed Changes


Add the new KBranchedStreamBranchedKStream  class  and branch() method for KStream (see https://github.com/apache/kafka/pull/6512).

...