Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleDataStream
/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}. The precondition is that the base stream is already grouped by
 * key and partitioned. Besides, the selected key should have remained unchanged before this 
 * transformation.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream
 * operator and therefore avoiding shuffling data and network consumption between two subtasks.
 * If shuffling or network consumption is acceptable, {@link DataStream#keyBy(KeySelector)} is
 * recommended.
 *
 * <p>"Pre-KeyedStream": The base stream is streaming from the external system, where it has
 * been grouped by key and partitioned base on {@link SourceSplit}.
 *
 * <p>IMPORTANT: For a "Pre-KeyedStream", the maximum number of currently existing splits must
 * not be larger than the maximum number of key-groups. Since every split would be mapped to one
 * key-group. To config the number of key-group, please config the maximum supported parallelism.
 *
 * <p>IMPORTANT: For a "Pre-KeyedStream", if chaining is disabled for debugging, this method
 * might not work.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector) {}

/**
 * @see #reinterpretAsKeyedStream(KeySelector)
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param typeInfo Explicit type information about the key type.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {}

...

                                                        Record → Key → SplitID → KeyGroup

However, this could introduce the following problems.

...

  1. However, the paradox is that for a specific subtask, the key group range assignment is static while the split assignment is actually dynamic and irregular by different Sources. For example, the split assignment for Kafka Source is push-based, which would round-robin assigning the new split to source readers. Another example is that the split assignment for File Source is pull-based, which means that the split assignment is totally irregular. 
  2. Therefore, the mapping of a split and a key group cannot build until the split is found and assigned.
  3. One possible solution is that the split is assigned to a key group before the job started, therefore, the split would be bound with a specific source operator. However, this might not be compatible with all of the Sources, e.g., File Source.

...

As we know, the KeyGroupRange of each operator is computed statically before the job is executed, while the split is found and assigned dynamically during running. The split assignment could be push-based and pull-based depends on different connectors. Therefore, the mapping from Split to KeyGroup should be dynamic, which means that the user cannot provide us a mapping from Split to KeyGroup. The mapping has to build and maintain by Flink during running.

Therefore, the new "KeyGroupRangeAssignment" would now be a filed in task-level. It takes responsibility of the following things.

  1. Whenever a split is added or removed from the SourceOperator, the "Assigner" needs to build a new mapping of new split to unused KeyGroup or delete the corresponding mapping for removed split.
  2. Provide a mapping from the record/key to KeyGroup according to the split it belongs to.

And user needs to provide the Flink a mapping, which is,

                                                        Record → Key → SplitID

  1. The mapping from a record to its key could be done by KeySelector, which is necessary for keyed statebackend to maintain the state.
  2. The mapping from a record or a key to the SplitID is necessary for "Assigner" to assign the KeyGroup.

Therefore, the API could be,

Code Block
languagejava
titleDataStream
/**
 * ...
 * 
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, KeyToSplit<K, String> keyToSplit) {}

However, the mapping from a record or a key to the SplitID could be obscure to the user.

...

Option 2. Assigned KeyGroup by the SourceOperator (Prefer one)

...

Code Block
languagejava
titleKeyedStateBackend
public interface KeyedStateBackend<K>
        extends KeyedStateFactory, PriorityQueueSetFactory, Disposable {
	/**
     * Sets the current key and its corresponding keyGroupKeyGroup that areis used for partitioned state.
     *
     * @param newKey The new current key.
     * @param keyGroup The keyGroupKeyGroup of added new key.
     */
	void setCurrentKeyAndKeyGroup(K newKey, int keyGroup);}

...