Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA: KAFKA-5488

Motivation

KStream.branch KStream#branch method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').

This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.

...

Code Block
languagejava
BranchedKStream branched = stream.split();
for (RecordType recordType : RecordType.values())
  branched.branch((k, v) -> v.getRecType() == recordType,
    Branched.withJavaConsumerwith(recordType::processRecords));

...

Code Block
languagejava
RecordType[] recordTypes = RecordType.values();
if (recordTypes.length != 0) {
  BranchedKStream branched = stream.
    branch((k, v) -> v.getRecType() == recordTypes[0],
       Branched.withJavaConsumerwith(recordType::processRecords));

  for (int i = 1; i < recordTypes.length; i++) 
    branched.branch((k, v) -> v.getRecType() == recordTypes[i],
      Branched.withJavaConsumerwith(recordType::processRecords));
}

...

Code Block
languagejava
BranchedKStream<K,V> split();
BranchedKStream<K,V> split(Named n);

2. Deprecate the existing branch existing KStream#branch method.

3. Add and implement the following Branched class:

Code Block
languagejava
class Branched<K, V> implements Named<Branched<K,V>> {
    static Branched<K, V> withNameas(String name);
    static Branched<K, V> withChainwith(Function<? super KStream<? super KStream<KK, ? super V>, ? extends KStream<? extends KStream<KK, ? extends V>> chain);
    static Branched<K, V> withJavaConsumer(Consumer<with(Function<? super KStream<? super K, ? super KStream<KV>, ? extends KStream<? extends K, ? extends V>> consumer chain, String name);
}

Add and implement the following BranchedKStream interface:

...