THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore>V2> streamJoined); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> streamJoined); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> streamJoined); |
This KIP will also add a new configuration object StreamJoined.
Code Block | ||||
---|---|---|---|---|
| ||||
public class StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> implements NamedOperation<StreamJoined<K, V1, V2, S1, S2>>V2>> { public static <K, V1, V2> StreamJoined<K, V1, V2, WindowStore<Bytes, byte[]>, WindowStore<Bytes, byte[]>> V2> as(final WindowBytesStoreSupplier storeSupplier, final WindowBytesStoreSupplier otherSupplier){} public static <K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> StreamJoined<K, V1, V2, S1, S2>V2> as(final String storeName) {} public static <K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> StreamJoined<K, V1, V2, S1, S2>V2> with(final Serde<K> keySerde, final Serde<V1> valueSerde, final Serde<V2> otherValueSerde) {} // The withName method will name the process and provide the base name // for any repartition topics if required public StreamJoined<K, V1, V2, S1, S2> V2> withName(final String name) {} // The withStoreName is used as the base name for stores provided by Kafka Streams // If users provide state store suppliers, then the name in the store supplier is used public StreamJoined<K, V1, V2, S1, S2>V2> withStoreName(final String storeName) {} public StreamJoined<K, V1, V2, S1, S2> V2> withKeySerde(final Serde<K> keySerde) {} public StreamJoined<K, V1, V2, S1, S2> V2> withValueSerde(final Serde<V1> valueSerde) {} public StreamJoined<K, V1, V2, S1, S2> V2> withOtherValueSerde(final Serde<V2> otherValueSerde) {} public StreamJoined<K, V1, V2, WindowStore<Bytes, byte[]>, S2> V2> withStoreSupplier(final WindowBytesStoreSupplier storeSupplier) {} public StreamJoined<K, V1, V2, S1, WindowStore<Bytes, byte[]>> V2> withOtherStoreSupplier(final WindowBytesStoreSupplier otherStoreSupplier) {} public StreamJoined<K, V1, V2, S1, S2>V2> withLogging(final boolean loggingEnabled) {} public StreamJoined<K, V1, V2, S1, S2> V2> withCaching(final boolean cachingEnabled) {} public StreamJoined<K, V1, V2, S1, S2> V2> withTopicConfig(final Map<String, String> topicConfig) {} public StreamJoined<K, V1, V2, S1, S2>V2> withRetention(final Duration retention) throws IllegalArgumentException {} |
...