Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: completely rewritten after discussion

...

  • In general, the code have poor cohesion: we need to define predicates in one place, and respective stream processors in another place of code. In case of change we must remember to edit two pieces of code.
  • If the number of predicates is predefined, this method forces us to use 'magic numbers' to extract the right branch from the result (see examples here).
  • If we need to build branches dynamically (e. g. one branch per enum value) we inevitably have to deal with 'generic arrays' and 'unchecked typecasts'.

Public Interfaces

The proposed new org.apache.kafka.streams.kstream.KafkaStreamsBrancher  classs introduces new standard way to build branches on top of KStream.

In accordance with KStreams DSL Grammar, we introduce the following new elements:

  • split DSLOperation
  • KBranchedStream DSLObject with following DSLOperations:
    • branch
    • defaultBranch
    • noDefaultBranch
  • Branched DSLParameter.

Description

1. The split(Named named) operation returns KBranchedStream. Named parameter is needed so one can name the branch operator itself, and then all the branches might get index-suffixed names built from the branch operator name.

The overloaded parameterless alternative `split()` is also available (in this case, an auto-generated name is used).

2. KBranchedStream has the following methods:

  • KBranchedStream branch(Predicate<K,V> predicate, Branched<K,V> branched) -- creates a branch for messages that and returns `this` in order to facilitate method chaining.
  • Map<String, KStream> defaultBranch(Branched<K,V> branched) -- creates a default branch (for messages not intercepted by other branches) and returns the dictionary of named KStreams.
  • Map<String, KStream> noDefaultBranch() -- returns the dictionary of named KStreams.

Both branch and defaultBranch operations also have overloaded parameterless alternatives.

3. Branched parameter extends NamedOperation and has the following builder methods:

  • withChain(Function<KStream<K, V>, KStream<K, V>> chain) -- a chain of operations with a given branch. By default, it is an s->s identity function. Can be complex, as s->s.mapValues.... etc.
  • withName(String name) -- sets the name of the branch (auto-generated by default, when split operation is named, then the names are index-suffixed).
  • withConsumer(Consumer<? super KStream<K, V>> consumer) -- directs the final result of chain to a given consumer. By default, a NO-OP consumer is provided.

The Map returned by defaultBranch/noDefaultBranch allows us to collect all the KStream branch objects in single scope. The branches collected are the results of transformations defined by `withChain` functions.

Simple Example: Direct Branch Consuming

Code Block
languagejava
source.split()
.branch((key, value) -> value.contains("A"), Branched.withConsumer(ks->ks.to("A")))
.branch((key, value) -> value.contains("B"), Branched.withConsumer(ks->ks.to("B")))
.defaultBranch(Branched.withConsumer(ks->ks.to("C")));

More Complex Example: Merging Branches

Code Block
languagejava
Map<String, KStream<String, String>> branches = source.split()
.branch((key, value) -> value == null, Branched.withName("null").withChain(s-s.mapValues(v->"NULL"))
.defaultBranch(Branched.withName("non-null"));
branches.get("non-null").merge(branches.get("null"));

Dynamic Branching

There is also a case when one might need to create branches dynamically, e. g. one per enum value. This can be implemented the following way:

Code Block
languagejava
KBranchedStream branched = stream.split();
for (RecordType recordType : RecordType.values())
branched.branch((k, v) -> v.getRecType() == recordType,
Branched.withConsumer(recordType::processRecords));


This is why 'starting' `split()` operation is necessary and it is better to have it rather than add new `branch` method to `KStream` directly.

Proposed Changes


Add the new KBranchedStream  class  and branch() method for KStream (see https://github.com/apache/kafka/pull/6512).

Compatibility, Deprecation, and Migration Plan

The proposed change is backwards compatible. The old KStreams#branch method should be deprecated.

Rejected Alternatives

  1. A KStreamsBrancher class that works the same way, but does not require KStream interface modification:
Code Block
languagejava
new KafkaStreamsBrancher<String, String>()
   .branch(

...

Code Block
languagejava
KStream<String, String> source_o365_user_activity = builder.stream("source");
KStream<String, String>[] branches = source_o365_user_activity.branch(
      (key, value) -> value.contains("A"), ks->ks.to("A"))
      .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   //default branch  (key, value) -> true
     );

branches[0]should not necessarily be defined in the end!
   .defaultBranch(ks->ks.to("AC");
branches[1].to("B");
branches[2].to("C");

...

)
   .onTopOf(builder.stream("source"));


Rejected because of validation of method-chaining.


2.

Code Block
languagejava
source
   .branchsplit()
   .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   .defaultBranch(ks->ks.to("C"));

Here the new KStream#branch() method returns KBranchedStream<K, V> object, which, in turn, contains `branch` and `defaultBranch` methods. This is critical that KStream consumers in .branch methods should be invoked immediately during the `branch` methods invocation. This is necessary for the case when we need to gather the streams that were defined in separate scopes back into one scope using auxiliary object:

Code Block
languagejava
@Setter
class CouponIssuer{
   private KStream<....> coffePurchases;
   private KStream<....> electronicsPurchases;

   KStream<...> coupons(){
       return coffePurchases.join(electronicsPurchases...)...
  }
}

CouponIssuer couponIssuer = new CouponIssuer();

transactionStream.branch()
     .branch(predicate1, couponIssuer::setCoffePurchases)
     .branch(predicate2, couponIssuer::setElectronicsPurchases);

KStream<..> coupons = couponIssuer.coupons();

Proposed Changes

Add the new KBranchedStream  class  and branch() method for KStream (see https://github.com/apache/kafka/pull/6512).

Compatibility, Deprecation, and Migration Plan

...

Rejected Alternatives

Add KStreamsBrancher class that works the same way, but does not require KStream interface modification:

...

languagejava

...

This was rejected because of the difficulty of having branches in the same scope.