You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the 2.2 (KIP-372 https://cwiki.apache.org/confluence/display/KAFKA/KIP-372%3A+Naming+Repartition+Topics+for+Joins+and+Grouping) release of Kafka streams, we added the Grouped class, which gave users the ability to name a repartition topic for aggregation operations. In that KIP, we piggybacked a change also to allow users to provide a name for the repartition topics of join operations.

In KIP-307 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL) we decided to use the same base name users provided for the repartition topic from KIP-372 to extend to naming the join operator as well as the state stores, hence changing the name of the changelog topics, if users had not already named the state stores via a Materialized object. While naming the state store for a join will not expose them for IQ, it is a recommended practice to keep the changelog topic names stable in the face of upstream changes in the topology.

One point we overlooked when discussing KIP-307. The KStream#join method does not take a Materialized object; thus, users never could name the state store hence the changelog topic. Due to the scope of changes in KIP-307, it was broken up over several PRs, and the first two were included in the 2.3 release. Having the partially implemented KIP does not in and of itself represent a problem as only internal classes were added needed for the ability to name operators. But since KSTream#join does not take a Materialized object if users elect to name the Join processor (hence naming the repartition topics as well) When 2.4 is released the provided name will automatically extend to the state stores for the Join and change the changelog topics as well. This sudden state store name change represents a breaking change that might catch users by surprise.

Public Interfaces

This KIP will add the following methods to the KStream interface. 

Methods Added to KStream
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows,
                             final Joined<K, V, VO> joined,
                             final Materialized materialized);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final Joined<K, V, VO> joined,
                                 final Materialized materialized);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final Joined<K, V, VO> joined,
                                  final Materialized materialized);


This KIP will also add the following methods to the Materialized class

Methods Added to Materialized
public Materialized<K, V, S> withQuerryingDisabled()
public Materialized<K, V, S> withQuerryingEnabled()

Proposed Changes

With this in mind, this KIP aims to add an overloaded KStream#join method accepting a Materialized parameter. The overloads will apply to all flavors of KStream#join (join, left, and outer). This will allow for users who wish to upgrade to 2.4 without having to make a breaking change. Now the Joined#as method will only name the Join processor and repartition topics and no longer name the associated state stores and changelog topics. So if users wish to upgrade to 2.4 and don't provide a name for the states stores, it will continue to use the auto-generated name in 2.3. Of course, if users elect to name the state stores, they can do so via Materialized, and it is recommended they do to pin the changelog topic names in case of any upstream topology changes.

Additionally, we'll add methods to Materialized withQueryingEnabled and withQueryingDisabled. There are a few reasons for adding these methods to Materialized.

  1. Users may wish to name a state store and hence the changelog for compatibility reasons so that future changes to the topology won't cause a shift in the changelog topic name, but not expose these stores to interactive queries.
  2. The second case is a mirror image of the first. Users may wish to expose a state store for interactive queries for debugging purposes, but for compatibility or other reasons, these users don't want to name the store. While this use case may be unlikely, we should support all possible use cases within reason.
  3. State stores for joins should not be exposed to interactive queries. So going forward when users opt to name the state stores for joins, Kafka Streams will automatically disable these stores for querying via the added Materialized#withQueryingDisabled methods.

Compatibility, Deprecation, and Migration Plan

Since the changes are additions in a strict sense, there are no compatibility issues with existing code.

  • For users that have not named the Join repartition node, then there are no migration concerns.
  • For users that previously have named the Join repartition node and want to upgrade to 2.4 since the state store will now pull its name from the Materialize object, a rolling restart should be possible as the name of state store will not change.
  • Users electing to add a name to the state store will need to stop all instances and redeploy the new code and reset the application before starting to ensure no data is unprocessed.

Rejected Alternatives

  • Adding a configuration to indicate if the Joined.named should be used for the state store name.  Adding a configuration was ruled out as it's not a best practice to add configs for this type of change.
  • Adding a method to the Joined config item as it seemed to clutter the API.
  • No labels