Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Therefore, the Flink should maintain a new "Assigner", which takes responsibility for the following few thingthings.

  1. Whenever a split is added or removed from the SourceOperator, the "Assigner" needs to build a new mapping of the new split to unused KeyGroup or delete the corresponding mapping for the removed split.
  2. Giving a Key, firstly computing the splitID it belongs to with the function provided by the user. Then returning the KeyGroup the splitID mapped.

...

Code Block
languagejava
titleDataStream
/**
 * ...
 * 
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param splitSelector Function that defines how a splitIDkey is mapping from a key to the splitID it is partitioned to.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, SplitSelector<K, String> splitSelector) {}

...

For a "pre-KeyedStream", the redistribution of keyed state of downstream operators of SourceTask depends on the source operator. Therefore, the mapping of Split to KeyGroup would be persistently restored and extracted for redistribution

Conclusion

For option 1, one obvious pro is the implementation could be simple. Only the "KeyGroupRangeAssignment#assignToKeyGroup" needs to be replaced with a dynamic one

The con is that the users need to provide a mapping from a key to splitID and the mapping is usually not a public API among all the external systems, e.g., Flink.

For option 2, it is just the opposite to option 1. The pro is that the users need not provide other information except for KeySelector. It is keeping clean for users to use this API.

The con is that the implementation could be complicated and sensitive since wrapping the StreamRecord with a new field. But I believe it is acceptable since the modification is limited in the SourceTask.