Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleMethods Added to KStream
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows,
                             final StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore>V2> streamJoined);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> streamJoined);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> streamJoined);

This KIP will also add a new configuration object StreamJoined.

Code Block
languagejava
titleStreamJoined
public class StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> implements NamedOperation<StreamJoined<K, V1, V2, S1, S2>>V2>> {

public static <K, V1, V2> StreamJoined<K, V1, V2, WindowStore<Bytes, byte[]>, WindowStore<Bytes, byte[]>> V2> as(final WindowBytesStoreSupplier storeSupplier,
                                                                                                             final WindowBytesStoreSupplier otherSupplier){}

public static <K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> StreamJoined<K, V1, V2, S1, S2>V2> as(final String storeName) {}

public static <K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> StreamJoined<K, V1, V2, S1, S2>V2> with(final Serde<K> keySerde,
                                                                                                             final Serde<V1> valueSerde,
                                                                                                             final Serde<V2> otherValueSerde) {}
 // The withName method will name the process and provide the base name 
 // for any repartition topics if required

 public StreamJoined<K, V1, V2, S1, S2> V2> withName(final String name) {}

 // The withStoreName is used as the base name for stores provided by Kafka Streams
 // If users provide state store suppliers, then the name in the store supplier is used
 public StreamJoined<K, V1, V2, S1, S2>V2> withStoreName(final String storeName) {}

 public StreamJoined<K, V1, V2, S1, S2> V2> withKeySerde(final Serde<K> keySerde) {}

 public StreamJoined<K, V1, V2, S1, S2> V2> withValueSerde(final Serde<V1> valueSerde) {}

 public StreamJoined<K, V1, V2, S1, S2> V2> withOtherValueSerde(final Serde<V2> otherValueSerde) {}

 public StreamJoined<K, V1, V2, WindowStore<Bytes, byte[]>, S2> V2> withStoreSupplier(final WindowBytesStoreSupplier storeSupplier) {}

 public StreamJoined<K, V1, V2, S1, WindowStore<Bytes, byte[]>> V2> withOtherStoreSupplier(final WindowBytesStoreSupplier otherStoreSupplier) {}

 public StreamJoined<K, V1, V2, S1, S2>V2> withLogging(final boolean loggingEnabled) {}

 public StreamJoined<K, V1, V2, S1, S2> V2> withCaching(final boolean cachingEnabled) {}

 public StreamJoined<K, V1, V2, S1, S2> V2> withTopicConfig(final Map<String, String> topicConfig) {}

 public StreamJoined<K, V1, V2, S1, S2>V2> withRetention(final Duration retention) throws IllegalArgumentException {}

...