Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In KIP-307 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL), we decided to use the same base name users provided for the repartition topic from KIP-372 to extend to naming the join operator as well as the state stores, hence changing the name of the changelog topics, if users had not already named the state stores via a Materialized object. Naming the state store for a join will not expose them for IQ. But it is a recommended practice to keep the changelog topic names stable in the face of upstream changes in the topology.

One point we overlooked when discussing KIP-307 . The is the KStream#join method does not take a Materialized object; thus, users could never could name the state store hence the changelog topic. Due to the scope of changes in KIP-307, it was broken up over several PRs, and the first two were included in the 2.3 release. Having the partially implemented KIP does not in and of itself represent a problem as only internal classes were added needed for the ability to name operators. But since KSTream#join does not take a Materialized object if users elect to name the Join processor (hence naming the repartition topics as well). When 2.4 is released, the provided name will automatically extend to the state stores for the Join and change the changelog topics as well. This sudden state store name change represents a breaking change that might catch users by surprise.  

...

Code Block
languagejava
titleMethods Added to KStream
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows,
                             final StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> streamJoined);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> streamJoined);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> streamJoined);

This KIP will also add a new configuration object StreamJoined.

Code Block
languagejava
titleStreamJoined
public class StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> implements NamedOperation<StreamJoined<K, V1, V2, S1, S2>>V2>> {

public static <K, V1, V2> StreamJoined<K, V1, V2, WindowStore<Bytes, byte[]>, WindowStore<Bytes, byte[]>> asV2> with(final WindowBytesStoreSupplier storeSupplier,
                                                                                                             final WindowBytesStoreSupplier otherSupplier){}

public static <K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> StreamJoined<K, V1, V2, S1, S2> V2> as(final String storeName) {}

public static <K, V1, V2, S1 extends StateStore, S2 extends StateStore> V2> StreamJoined<K, V1, V2, S1, S2>V2> with(final Serde<K> keySerde,
                                                                                                             final Serde<V1> valueSerde,
                                                                                                             final Serde<V2> otherValueSerde) {}
 // The withName method will name the process and provide the base name 
 // for any repartition topics if required

 public StreamJoined<K, V1, V2, S1, S2>V2> withName(final String name) {}

 // The withStoreName is used as the base name for stores provided by Kafka Streams
 // If users provide state store suppliers, then the name in the store supplier is used
 public StreamJoined<K, V1, V2, S1, S2> V2> withStoreName(final String storeName) {}

 public StreamJoined<K, V1, V2, S1, S2> V2> withKeySerde(final Serde<K> keySerde) {}

 public StreamJoined<K, V1, V2, S1, S2> V2> withValueSerde(final Serde<V1> valueSerde) {}

 public StreamJoined<K, V1, V2, S1, S2>V2> withOtherValueSerde(final Serde<V2> otherValueSerde) {}

 public StreamJoined<K, V1, V2, WindowStore<Bytes, byte[]>, S2> withStoreSupplierV2> withThisStoreSupplier(final WindowBytesStoreSupplier storeSupplier) {}

 public StreamJoined<K, V1, V2, S1, WindowStore<Bytes, byte[]>>V2> withOtherStoreSupplier(final WindowBytesStoreSupplier otherStoreSupplier) {}

 public StreamJoined<K, V1, V2, S1, S2> withLogging(final boolean loggingEnabled) {}

 public StreamJoined<K, V1, V2, S1, S2> withCaching(final boolean cachingEnabled) {}

 public StreamJoined<K, V1, V2, S1, S2> withTopicConfig(final Map<String, String> topicConfig) {}

 public StreamJoined<K, V1, V2, S1, S2> withRetention(final Duration retention) throws IllegalArgumentException {}


Proposed Changes

With this in mind, this KIP aims to add a new configuration object StreamJoined. The StreamJoined configuration allows users to specify Serdes for the join, naming of join processor, the base name of the default state stores, and provide store suppliers.  Essentially a merging of Joined and Materialized configuration objects.  We'll add an overloaded KStream#join method accepting a StreamJoined parameter without a Joined parameter. The overloads will apply to all flavors of KStream#join (join, left, and outer).  Due to the significant overlap of the new StreamJoined configuration, we'll also deprecate the Joined configuration object and methods using Joined as well KStream join methods taking the Joined parameter.

This will allow for users who wish to upgrade to 2.4 without having to make a breaking change. Now the Joined#as method will only name the Join processor and repartition topics and no longer name the associated state stores and changelog topics. So if users wish to upgrade to 2.4 and don't provide a name for the states stores, it will continue to use the auto-generated name in 2.3. Of course, if users elect to name the state stores, they can do so via StreamJoined#as("store name"). Naming the store is recommended to pin the changelog topic names in case of any upstream topology changes.

...

  • The StreamJoined#withName method will provide name only the join processor and the base name of any required repartition topics.  Not using the StreamJoined#withName will result in Kafka Streams auto-generating names for the join processor and any repartition topics.
  • The StreamJoined#withStoreName will provide only the base name for the state stores of the join.  If users pass in  StoreSupplier(s), then the name of the StoreSupplier is used. The StoreSupplier names always take priority over names coming from StreamJoined#withStoreName.

...

  • Users can provide 0, 1 or 2 StoreSupplier instances.
    • The provided StoreSupplier instances must implement WindowBytesStoreSupplier.
    • In the case of providing zero StoreSupplier(s), KafkaStreams will create two persistent state stores as it does now.  The names for the stores will be auto-generated unless the user gives a name with the StreamJoined#withName method. 
    • In the case of providing one StoreSuppliers StoreSupplier, KafkaStreams will create one persistent state store, and the StoreSupplier will create one state store. The name for the default store will be auto-generated unless the user gives a name with the StreamJoined#withName method. 
    • When providing two StoreSuppliers, the state stores for the join will come from the suppliers, and the names will come from the name names given to the suppliersuppliers.
  • There will be a runtime check to ensure that the values of JoinWindows match those of the provided StoreSuppliers.
  • The name of the StoreSupplier will take priority over the name given via StreamJoined#withStoreName.When using any store configuration methods (disabling logging, caching, topic configurations), KafkaStreams will apply these changes to both stores. While this may seem strict, since both stores are participating in the join, it would seem prudent to have both stores configured the same way.

Compatibility, Deprecation, and Migration Plan

...

  • For users that have not named the Join repartition node, then there are no migration concerns.
  • For users that previously have named the Join repartition node and want to upgrade to 2.4, a rolling restart should be possible since the store name will not change.
  • Users electing to add a name to the state store will need to stop all instances and redeploy the new code. Then reset the application before starting to ensure no data is unprocessed.
  • We'll deprecate the Joined config and Stream-Stream join methods using Joined object.  These items will be removed in a later major release.

...