Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...


Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

According to the "Flink-8571 Provide an enhanced KeyedStream implementation to use ForwardPartitioner", the DataStream could be reinterpreted as KeyedStream with ForwardPartitioner with the following two use cases.

...

This FLIP would like to address this problem and maybe we can introduce the reinterpretAsKeyedStream() to the DataStream formally so that more and more users can be aware of this API.

Public Interfaces

Anchor
reinterpretAsKeyedStream
reinterpretAsKeyedStream

Code Block
languagejava
titleDataStream
/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>IMPORTANT: The precondition is that the base stream has either been partitioned by 
 * {@link DataStream#keyBy(KeySelector)} or it is a "Pre-KeyedStream". Besides, the selected key
 * and partitioning of base stream should have remained unchanged before this transformation.
 *
 * <p>"Pre-KeyedStream": The base stream is streaming from the external system, where it has
 * been grouped by key and partitioned base on {@link SourceSplit}.
 *
 * <p>For a "Pre-KeyedStream", the maximum number of currently existing splits must
 * not be larger than the maximum number of key-groups. Since every split would be mapped to one
 * key-group. To config the number of key-group, please config the maximum supported parallelism.
 *
 * <p>IMPORTANT: For a "Pre-KeyedStream", if chaining is disabled for debugging, this method
 * might not work.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream
 * operator and therefore avoiding shuffling data and network consumption between two subtasks.
 * If shuffling or network consumption is acceptable, {@link DataStream#keyBy(KeySelector)} is
 * recommended.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector) {}

/**
 * @see #reinterpretAsKeyedStream(KeySelector)
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param typeInfo Explicit type information about the key type.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {}

Usage Examples

Code Block
DataStream<Tuple<String, Integer, Integer>> input = fromSource(...);
// If the input source stream is not pre-KeyedStream
// Two aggregation operators would be chained together by default.
DataStream<Tuple<String, Integer, Integer>> stream = input.keyBy(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);
// If the input source stream is pre-KeyedStream
// The source operator and the fowllwing aggregation operators would be chained together by default. 
DataStream<Tuple<String, Integer, Integer>> stream = input.reinterpretAsKeyedStream(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);

Proposed Changes

Before everything else, let's give a definition to the "pre-KeyedStream".

...

"Since the problem is caused by pre-KeyedStream, all of the following proposed discussion and modification would be narrowed down in the SourceTask only."

Option 1. Re-define KeyGroupRangeAssignment

The KeyGroupRangeAssignment does two main things.

...

However, the mapping from a record or a key to the SplitID could be obscure to the user. For example, the data could be computed in Flink, then streaming to the Flink through an MQ, e.g., Kafka. In this example, neither the partitioner nor the splitID is a public API for the users. This could be common under many scenarios. Therefore, it is hard and possibly meaningless for users to provide the "SplitSelector".



Option 2. Assign KeyGroup by the SourceOperator (Prefer one)

In this option, we would like to address the problems introduced above. The mapping of a key to a split and a split to a key group can actually be established inside the Source Operator. 

...

Overall, comparing with hashing the key of a record and getting its key group, the key group would now be allocated by SourceOperator if the input stream is "pre-KeyedStream". The assignment of the key group would be propagated among the operators in the SourceTask. Because in general, the partitioning between two tasks is All-to-All, which leads to data shuffle. Therefore, the downstream operator that still wants to take advantage of KeyedStream's features has to repartition the data stream through keyBy().

Rescaling

For a "pre-KeyedStream", the redistribution of keyed state of downstream operators of SourceTask depends on the source operator. Therefore, the mapping of Split to KeyGroup would be persistently restored and extracted for redistribution. 

Conclusion

For option 1, one obvious pro is the implementation could be simple. Only the "KeyGroupRangeAssignment#assignToKeyGroup" needs to be replaced with a dynamic one. 

...

For these two options, there is a common limitation, which is the number of splits should not be larger than the number of key groups. Because finally, we want to build a mapping from a split to a key group to ensure correctness during rescaling.

Compatibility, Deprecation, and Migration Plan

This improvement will not impact the current users.

Test Plan

Additional unit tests will be added to test if the API works well when the input streaming is "Pre-KeyedStream". 

Rejected Alternatives

No option rejected yet.