Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Therefore, the new "KeyGroupRangeAssignment" would now be a filed in at the task - level. It takes responsibility of for the following things.

  1. Whenever a split is added or removed from the SourceOperator, the "Assigner" needs to build a new mapping of the new split to unused KeyGroup or delete the corresponding mapping for the removed split.
  2. Provide Maintain a mapping from the record/key to KeyGroup according to the split it belongs to.

...

  1. The mapping from a record to its key could be done by KeySelector, which is necessary for keyed statebackend state backend to maintain the state.
  2. The mapping from a record or a key to the SplitID is necessary for "Assigner" to assign the KeyGroup.

...

Code Block
languagejava
titleDataStream
/**
 * ...
 * 
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, KeyToSplit<KSplitSelector<K, String> keyToSplitsplitSelector) {}

However, the mapping from a record or a key to the SplitID could be obscure to the user. For example, the data could be computed in Flink, then streaming to the Flink through an MQ, e.g., Kafka. Neither the logic of partitioning of the computational framework nor the splitID is a public API for the users. It is hard and possibly meaningless for users to provide the Flink such mapping.


Image Added


Option 2. Assigned KeyGroup by the SourceOperator (Prefer one)

...