Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Accepted

Discussion thread: here

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the 2.2 (KIP-372 https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping) release of Kafka streams, we added the Grouped class, which gave users the ability to name a repartition topic for aggregation operations. In that KIP, we piggybacked a change also to allow users to provide a name for the repartition topics of join operations.

...

While a StoreSupplier will return a unique StateStore instance with each invocation of StoreSupplier.get(), the names need to be unique as well.  Hence we need to provide users a way to 1) Name the state stores to prevent shifting of names with upstream topology changes or 2) fully customize the join by delivering their StoreSupplier instances.  While we could add another Materialized parameter, doing so is starting to go against the grain of using configuration objects in the first place; reducing the number of overloads. Also, for users wishing to name the stores, we don't need 2 Materialized instances, just the base name will suffice.

Public Interfaces

This KIP will add the following methods to the KStream interface. 

...

Code Block
languagejava
titleStreamJoined
public class StreamJoined<K, V1, V2, S1 extends StateStore, S2 extends StateStore> implements NamedOperation<StreamJoined<K, V1, V2, S1, S2>> {

public static <K, V1, V2> StreamJoined<K, V1, V2, WindowStore<Bytes, byte[]>, WindowStore<Bytes, byte[]>> as(final WindowBytesStoreSupplier storeSupplier,
                                                                                                             final WindowBytesStoreSupplier otherSupplier){}

public static <K, V1, V2, S1 extends StateStore, S2 extends StateStore> StreamJoined<K, V1, V2, S1, S2> as(final String storeName) {}

public static <K, V1, V2, S1 extends StateStore, S2 extends StateStore> StreamJoined<K, V1, V2, S1, S2> with(final Serde<K> keySerde,
                                                                                                             final Serde<V1> valueSerde,
                                                                                                             final Serde<V2> otherValueSerde) {}
 // The withName method will name the process and provide the base name 
 // for any repartition topics if required

 public StreamJoined<K, V1, V2, S1, S2> withName(final String name) {}

 // The withStoreName is used as the base name for stores provided by Kafka Streams
 // If users provide state store suppliers, then the name in the store supplier is used
 public StreamJoined<K, V1, V2, S1, S2> withStoreName(final String storeName) {}

 public StreamJoined<K, V1, V2, S1, S2> withKeySerde(final Serde<K> keySerde) {}

 public StreamJoined<K, V1, V2, S1, S2> withValueSerde(final Serde<V1> valueSerde) {}

 public StreamJoined<K, V1, V2, S1, S2> withOtherValueSerde(final Serde<V2> otherValueSerde) {}

 public StreamJoined<K, V1, V2, WindowStore<Bytes, byte[]>, S2> withStoreSupplier(final WindowBytesStoreSupplier storeSupplier) {}

 public StreamJoined<K, V1, V2, S1, WindowStore<Bytes, byte[]>> withOtherStoreSupplier(final WindowBytesStoreSupplier otherStoreSupplier) {}

 public StreamJoined<K, V1, V2, S1, S2> withLogging(final boolean loggingEnabled) {}

 public StreamJoined<K, V1, V2, S1, S2> withCaching(final boolean cachingEnabled) {}

 public StreamJoined<K, V1, V2, S1, S2> withTopicConfig(final Map<String, String> topicConfig) {}

 public StreamJoined<K, V1, V2, S1, S2> withRetention(final Duration retention) throws IllegalArgumentException {}


Proposed Changes

With this in mind, this KIP aims to add a new configuration object StreamJoined. The StreamJoined configuration allows users to specify Serdes for the join, naming of join processor, the base name of the default state stores, provide store suppliers.  Essentially a merging of Joined and Materialized configuration objects.  We'll add overloaded KStream#join method accepting a StreamJoined parameter without a Joined parameter. The overloads will apply to all flavors of KStream#join (join, left, and outer).  Due to the significant overlap of the new StreamJoined configuration, we'll also deprecate the Joined configuration object and methods using Joined as well.

...

We should note that providing store suppliers to the join will not enable interactive queries over the join state stores. We'll update the documentation stating as much.

Semantics of StreamJoined

With the ability to provide so many configuration items, we should probably discuss the semantics of the new configuration.

Serdes

  • Users are required to always provide the serdes for the key and the value type of both streams. The serdes are needed when constructing the stores so streams can (de)serialize the keys and values properly. Hence they are always required.
  • KafkaStreams will use the serdes to configure both stores regardless if the stores are the default or come from user provided StoreSupplier instances.

Naming

  • The StreamJoined#withName method will provide only the join processor and the base name of any required repartition topics.  Not using the StreamJoined#withName will result in Kafka Streams auto-generating names for the join processor and any repartition topics.
  • The StreamJoined#withStoreName will provide only the base name for the state stores of the join.  If users pass in  StoreSupplier(s), then the name of the StoreSupplier is used. The StoreSupplier names always take priority over names coming from StreamJoined#withStoreName.

Store Suppliers

  • Users can provide 0, 1 or 2 StoreSupplier instances.
    • In the case of providing zero StoreSupplier(s), KafkaStreams will create two persistent state stores as it does now.  The names for the stores will be auto-generated unless the user gives a name with the StreamJoined#withName method. 
    • In the case of providing one StoreSuppliers, KafkaStreams will create one persistent state store, and the StoreSupplier will create one state store. The name for the default store will be auto-generated unless the user gives a name with the StreamJoined#withName method. 
    • When providing two StoreSuppliers, the state stores for the join will come from the suppliers, and the names will come from the name given to the supplier.
  • There will be a runtime check to ensure that the values of JoinWindows match those of the provided StoreSuppliers.
  • The name of the StoreSupplier will take priority over the name given via StreamJoined#withStoreName.
  • When using any store configuration methods (disabling logging, caching, topic configurations), KafkaStreams will apply these changes to both stores. While this may seem strict, since both stores are participating in the join, it would seem prudent to have both stores configured the same way.


Compatibility, Deprecation, and Migration Plan

Since the changes are additions in a strict sense, there are no compatibility issues with existing code.

  • For users that have not named the Join repartition node, then there are no migration concerns.
  • For users that previously have named the Join repartition node and want to upgrade to 2.4, a rolling restart should be possible since the store name will not change.
  • Users electing to add a name to the state store will need to stop all instances and redeploy the new code. Then reset the application before starting to ensure no data is unprocessed.
  • We'll deprecate the Joined config and methods using Joined object.  These items will be removed in a later major release.

Rejected Alternatives

  • Adding a configuration to indicate if the Joined.named should be used for the state store name.  Adding a configuration was ruled out as it's not a best practice to add configs for this type of change.
  • Adding a method to the Joined config item as it seemed to clutter the API.