Status

Current stateAccepted

Discussion threadhere

JIRAKAFKA-5765

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Merging multiple KStreams is done via StreamsBuilder's merge() (formally KStreamsBuilder's merge()). This is quite unnatural and should be done via KStream's merge().

Public Interfaces

We will add a merge() method to the KStream interface.

KStream<K,V> merge(KStream<K,V> stream);

Proposed Changes

Originally, the method merge() was defined in StreamsBuilder. Through a series of calls, it called a merge() method in KStreamImpl seperate from the one we are adding. 

static <K,V> KStream<K,V> merge(InternalStreamsBuilder builder, KStream<K,V>[] streams)

In the new structure, we will add a  method override with merge():

@Override

public KStream<K,V> merge(KStream<K,V> stream){

    return this.builder.merge(stream); //builder (type:InternalStreamsBuilder) was defined in the AbstractStream superclass

}

We will no longer use several variable arguments in one call, rather there will only be one stream that is taken as an input argument.

Compatibility, Deprecation, and Migration Plan

We will remove the old merge() method inside StreamsBuilder. Classes (or systems) calling the old, deprecated method will have to switch to calling the new one

in KStreamImpl. 

Rejected Alternatives

There was plans to use the merge() method of the following form:

<K,V> KStream<K,V> merge(StreamsBuilder builder, KStream<K,V> ... streams)

However, the builder parameter is extra and unnecesary, as the local InternalStreamsBuilder is sufficient.

 

 

  • No labels