Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: rewritten the proposal as a result of discussion in maillist

...


we could use


Code Block
languagejava
newsource
  KafkaStreamsBrancher<String, String>.branch()
   .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   //default branch should not necessarily be defined in the end!
   .defaultBranch(ks->ks.to("C"))
   .onTopOf(builder.stream("source"));

(onTopOf method returns the provided stream so we can continue with method chaining and do something more with the original stream)

Proposed Changes

.default(ks->ks.to("C"));

Here the new KStream#branch() method returns KBranchedStream<K, V> object, which, in turn, contains `branch` and `default` methods.

This is critical that KStream consumers in .branch methods should be invoked immediately during the `branch` methods invocation. This is necessary for the case when we need to gather the streams that were defined in separate scopes back into one scope using auxiliary object:

Code Block
languagejava
@Setter
class CouponIssuer{
   private KStream<....> coffePurchases;
   private KStream<....> electronicsPurchases;

   KStream<...> coupons(){
       return coffePurchases.join(electronicsPurchases...)...
  }
}

CouponIssuer couponIssuer = new CouponIssuer();

transactionStream.branch()
     .branch(predicate1, couponIssuer::setCoffePurchases)
     .branch(predicate2, couponIssuer::setElectronicsPurchases);

KStream<..> coupons = couponIssuer.coupons();


Proposed Changes


Add the new KBranchedStream  class  and branch() method for KStream Add the new org.apache.kafka.streams.kstream.KafkaStreamsBrancher class (see https://github.com/apache/kafka/pull/61646512).

Compatibility, Deprecation, and Migration Plan

The proposed change has no impact on existing code and is backwards compatible. All the old code that uses branchmethod will continue to work, we will just get the new way to perform branching.

Rejected Alternatives

...

Add KStreamsBrancher class that works the same way, but does not require KStream interface modification:

Code Block
languagejava
new KafkaStreamsBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   //default branch should not necessarily be defined in the end!
   .defaultBranch(ks->ks.to("C"))
   .onTopOf(builder.stream("source"));