You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-5488

Motivation

KStream.branch method uses varargs to supply predicates and returns array of streams ('Each stream in the result array corresponds position-wise (index) to the predicate in the supplied predicates').

This is poor API design that makes building branches very inconvenient because of 'impedance mismatch' between arrays and generics in Java language.

  • In general, the code have poor cohesion: we need to define predicates in one place, and respective stream processors in another place of code. In case of change we must remember to edit two pieces of code.
  • If the number of predicates is predefined, this method forces us to use 'magic numbers' to extract the right branch from the result (see examples here).
  • If we need to build branches dynamically (e. g. one branch per enum value) we inevitably have to deal with 'generic arrays' and 'unchecked typecasts'.

Public Interfaces

In accordance with KStreams DSL Grammar, we introduce the following new elements:

  • split DSLOperation
  • KBranchedStream DSLObject with following DSLOperations:
    • branch
    • defaultBranch
    • noDefaultBranch
  • Branched DSLParameter.

Description

1. The split(Named named) operation returns KBranchedStream. Named parameter is needed so one can name the branch operator itself, and then all the branches might get index-suffixed names built from the branch operator name.

The overloaded parameterless alternative `split()` is also available.

2. KBranchedStream has the following methods:

  • KBranchedStream branch(Predicate<K,V> predicate, Branched<K,V> branched) -- creates a branch for messages that and returns `this` in order to facilitate method chaining.
  • Map<String, KStream> defaultBranch(Branched<K,V> branched) -- creates a default branch (for messages not intercepted by other branches) and returns the dictionary of named KStreams.
  • Map<String, KStream> noDefaultBranch() -- returns the dictionary of named KStreams.

Both branch and defaultBranch operations also have overloaded parameterless alternatives.

3. Branched parameter extends NamedOperation and has the following builder methods:

  • withChain(Function<KStream<K, V>, KStream<K, V>> chain) -- a chain of operations with a given branch. By default, it is an s->s identity function. Can be complex, as s->s.mapValues.... etc.
  • withName(String name) -- sets the name of the branch (auto-generated by default, when split operation is named, then the names are index-suffixed).
  • withConsumer(Consumer<? super KStream<K, V>> consumer) -- directs the final result of chain to a given consumer. By default, a NO-OP consumer is provided.

The Map returned by defaultBranch/noDefaultBranch allows us to collect all the KStream branch objects in single scope. The branches collected are the results of transformations defined by `withChain` functions.

Simple Example: Direct Branch Consuming

source.split()
.branch((key, value) -> value.contains("A"), Branched.withConsumer(ks->ks.to("A")))
.branch((key, value) -> value.contains("B"), Branched.withConsumer(ks->ks.to("B")))
.defaultBranch(Branched.withConsumer(ks->ks.to("C")));

More Complex Example: Merging Branches

Map<String, KStream<String, String>> branches = source.split()
.branch((key, value) -> value == null, Branched.withName("null").withChain(s-s.mapValues(v->"NULL"))
.defaultBranch(Branched.withName("non-null"));
branches.get("non-null").merge(branches.get("null"));

Dynamic Branching

There is also a case when one might need to create branches dynamically, e. g. one per enum value. This can be implemented the following way:

KBranchedStream branched = stream.split();
for (RecordType recordType : RecordType.values())
branched.branch((k, v) -> v.getRecType() == recordType,
Branched.withConsumer(recordType::processRecords));


This is why 'starting' `split()` operation is necessary and it is better to have it rather than add new `branch` method to `KStream` directly.

Proposed Changes


Add the new KBranchedStream  class  and branch() method for KStream (see https://github.com/apache/kafka/pull/6512).

Compatibility, Deprecation, and Migration Plan

The proposed change is backwards compatible. The old KStreams#branch method should be deprecated.

Rejected Alternatives

  1. A KStreamsBrancher class that works the same way, but does not require KStream interface modification:
new KafkaStreamsBrancher<String, String>()
   .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   //default branch should not necessarily be defined in the end!
   .defaultBranch(ks->ks.to("C"))
   .onTopOf(builder.stream("source"));

Rejected because of validation of method-chaining.

2.

source
   .split()
   .branch((key, value) -> value.contains("A"), ks->ks.to("A"))
   .branch((key, value) -> value.contains("B"), ks->ks.to("B"))
   .defaultBranch(ks->ks.to("C"));

Here the new KStream#branch() method returns KBranchedStream<K, V> object, which, in turn, contains `branch` and `defaultBranch` methods. This is critical that KStream consumers in .branch methods should be invoked immediately during the `branch` methods invocation. This is necessary for the case when we need to gather the streams that were defined in separate scopes back into one scope using auxiliary object:

@Setter
class CouponIssuer{
   private KStream<....> coffePurchases;
   private KStream<....> electronicsPurchases;

   KStream<...> coupons(){
       return coffePurchases.join(electronicsPurchases...)...
  }
}

CouponIssuer couponIssuer = new CouponIssuer();

transactionStream.branch()
     .branch(predicate1, couponIssuer::setCoffePurchases)
     .branch(predicate2, couponIssuer::setElectronicsPurchases);

KStream<..> coupons = couponIssuer.coupons();

This was rejected because of the difficulty of having branches in the same scope.

  • No labels