Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: KAFKA-5488

Motivation

`KStreamKStream.branch` branch method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').

This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.

  • In general, the code have poor cohesion: we need to define predicates in one place, and respective stream processors in another place of code. In case of change we must remember to edit two pieces of code.
  • If the number of predicates is predefined, this method forces us to use 'magic numbers' to extract the right branch from the result (see examples here).
  • If we need to build branches dynamically (e. g. one branch per enum value) we inevitably have to deal with 'generic arrays' and 'unchecked typecasts'.

Public Interfaces

The proposed class `KafkaStreamBrancher` new org.apache.kafka.streams.kstream.KafkaStreamsBrancher  classs introduces new standard way to build branches on top of KStream.

...

Code Block
languagejava
new KafkaStreamsBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   //default branch should not necessarily be defined in the end!
   .defaultBranch(ks->ks.to("C"))
   .onTopOf(builder.stream("source"));

(onTopOf method returns the provided stream so we can continue with method chaining and do something more with the original stream)

Public Interfaces

Add the new org.apache.kafka.streams.kstream.KafkaStreamsBrancher class (see https://github.com/apache/kafka/pull/6164).

Proposed Changes

Add the new org.apache.kafka.streams.kstream.KafkaStreamsBrancher class (see https://github.com/apache/kafka/pull/6164).

...

The proposed change has no impact on existing code and is backwards compatible. All the old code that uses branchmethod will continue to work, we will just get the new way to perform branching.

Rejected Alternatives

No