Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As we know, the KeyGroupRange of each operator is computed statically before the job is executed, while the split is found and assigned dynamically during running. The split assignment could be push-based and or pull-based depends on different connectors. Therefore, the mapping from Split to KeyGroup should be dynamic, which means that the user cannot provide us Flink a mapping from Split to KeyGroup. The mapping has to build and maintain by Flink during running.

Therefore, the Flink should maintain a new "KeyGroupRangeAssignment" would now be filed at the task level. It Assigner", which takes responsibility for the following thingsfew thing.

  1. Whenever a split is added or removed from the SourceOperator, the "Assigner" needs to build a new mapping of the new split to unused KeyGroup or delete the corresponding mapping for the removed split.
  2. Maintain a mapping from the record/key to KeyGroup according to the split it belongs to.
  3. Giving a Key, firstly computing the splitID it belongs to with the function provided by the user. Then returning the KeyGroup the splitID mapped.

The functions users provide And user needs to provide the Flink a mapping, which is,

                                                        Record → Key → SplitID

  1. The mapping from a record to its key could be done by KeySelector, which is necessary for keyed state backend to maintain the state.
  2. The mapping from a record or a key to the SplitID is necessary for "Assigner" to assign the KeyGroup.

...

Code Block
languagejava
titleDataStream
/**
 * ...
 * 
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param splitSelector Function that defines how splitID is mapping from a key
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, SplitSelector<K, String> splitSelector) {}

However, the mapping from a record or a key to the SplitID could be obscure to the user. For example, the data could be computed in Flink, then streaming to the Flink through an MQ, e.g., Kafka. In this example, neither the logic of partitioning nor the splitID is a public API for the users. This could be common under many scenarios. Therefore, it is hard and possibly meaningless for users to provide the "SplitSelector".

...

In this option, we would like to address the problem 1 & 2 problems introduced above. The mapping of a key to a split and a split to a key group can actually be established inside the Source Operator. 

...

For a "pre-KeyedStream", the redistribution of keyed state of downstream operators of SourceTask depends on the source operator. Therefore, the mapping of Split to KeyGroup would be persistently restored and extracted for redistribution.