Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In accordance with KStreams DSL Grammar, we introduce a new parameterless DSLOperation markAsPartitioned() on KStream, which resets keyChangingOperation flag for the upstream node.

Code Block
languagejava
public interface KStream<K, V> {    
	/**
     * Marking the {@code KStream} as partitioned signals the stream is partitioned as intended,
     * and does not require further repartitioning in downstream key changing operations.
     *
     * <p><em>
     *     Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}.
     *     For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation.
     *     However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins
     *     assumes and uses the composite key instead of the original key.
     * </p></em>
     *
     * @return a new, mutated {@code KStream} that will not repartition in subsequent operations.
     */
    KStream<K, V> markAsPartitioned();

}

Proposed Change

In KStreamImpl, set the boolean flag repartitionRequired  to be false when the method is invokedCalling the new DSLOperation will return a new, mutated KStream. The new instance will not repartition as downstream operations are chained onto it. Whereas the original stream keeps its own internal property to operate in the default way

Usage

Example: canceling repartition in a streams aggregation would look like:

Code Block
languagejava
stream
	.selectKey( ... => (key1, metric))
	.markAsPartitioned()
	.groupByKey()
	.aggregate()

Example: fan out from the same stream:

Code Block
KStream myStream = build.stream(...).map(...);

// the aggregation will not repartition as it works on a mutated KStream
myStream.markAsPartiitoned().groupByKey().aggregate(...);

// the join operation will repartition as it left joins with the original KStream
myStream.join(myOtherTable);


Concerns

  • The usage of this operation complicates the usage of IQ(Interactive Query) and joins. For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. However, when the repartitions are canceled, records stayed in their original partition by their original key. IQ assumes and uses the composite key instead of the original key. That's when IQ can break downstream. The same applies to joins. 
  • In the documentation, we specifically advise against using the interface with IQ or joins. 
  • However, a potential solution to support IQ is to provide a 'reverse mapping' for the composite key that restores the original key, which can then be used for obtaining the metadata. We can follow up with a change when there is request. 

...