Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This FLIP would like to address these problems and introduce the reinterpretAsKeyedStream() to the DataStream formally.

Public Interfaces

Anchor
reinterpretAsKeyedStream
reinterpretAsKeyedStream

Code Block
languagejava
titleDataStream
/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream operator and therefore
 * avoiding shuffling data and network consumption between two subtasks. If shuffling or network consumption is acceptable, 
 * {@link DataStream#keyBy(KeySelector)} is recommended.
 *
 * <p>IMPORTANT: If the base stream is not inside the SourceTask, this method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has ever been partitioned with the {@link DataStream#keyBy(KeySelector)}.
 *     <li>The value of selected keys of the base stream have not changed since {@link DataStream#keyBy(KeySelector)}.
 *     <li>No data shuffling since {@link DataStream#keyBy(KeySelector)}.
 * </ul>
 *
 * <p>Overall, the data stream has to be partitioned exactly in the same way as if it was
 * created through a {@link DataStream#keyBy(KeySelector)}.
 *
 * <p>IMPORTANT: If the base stream is inside the SourceTask, which means that the data stream from the external
 * system has been partitioned and so-called "pre-KeyedStream". This method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has been partitioned in the external system. But the stream does not need to be
 *     partitioned exactly in the same way Flink's keyBy would partition the data.
 *     <li>Selected keys of the base stream could only exist in one {@link SourceSplit}. If it exists in multiple
 *     {@link SourceSplit}s, the stream should not be called "pre-KeyedStream". And therefore use this method could
 *     cause state inconsistency.
 *     <li>The value of selected keys of the base stream remain the same as the input source stream from the external system.
 * </ul>
 *
 * <p>IMPORTANT: For a "pre-KeyedStream", the maximum number of currently existing splits must not be larger than the maximum
 * number of key-groups. Since every split would be mapped to one key-group. To config the number of key-group, please config the
 * maximum supported parallelism.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector) {}

/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream operator and therefore
 * avoiding shuffling data between two subtasks. If shuffling is acceptable, {@link DataStream#keyBy(KeySelector)}
 * is recommended.
 *
 * <p>IMPORTANT: If the base stream is not inside the SourceTask, this method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has ever been partitioned with the {@link DataStream#keyBy(KeySelector)}.
 *     <li>The value of selected keys of the base stream have not changed since {@link DataStream#keyBy(KeySelector)}.
 *     <li>No data shuffling since {@link DataStream#keyBy(KeySelector)}.
 * </ul>
 *
 * <p>Overall, the data stream has to be partitioned exactly in the same way as if it was
 * created through a {@link DataStream#keyBy(KeySelector)}.
 *
 * <p>IMPORTANT: If the base stream is inside the SourceTask, which means that the data stream from the external
 * system has been partitioned and so-called "pre-KeyedStream". This method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has been partitioned in the external system. But the stream does not need to be
 *     partitioned exactly in the same way Flink's keyBy would partition the data.
 *     <li>Selected keys of the base stream could only exist in one {@link SourceSplit}. If it exists in multiple
 *     {@link SourceSplit}s, the stream should not be called "pre-KeyedStream". And therefore use this method could
 *     cause state inconsistency.
 *     <li>The value of selected keys of the base stream remain the same as the input source stream from the external system.
 * </ul>
 *
 * <p>IMPORTANT: For a "pre-KeyedStream", the maximum number of currently existing splits must not be larger than the maximum
 * number of key-groups. Since every split would be mapped to one key-group. To config the number of key-group, please config the
 * maximum supported parallelism.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param typeInfo Explicit type information about the key type.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {}

...

But I believe that this is acceptable to the user since the source operators and downstream operators and now chaining together, which means that although memory usage could increase fo a bit, the network consumption is saved. Another motivation of such wrapped KeyedStreamRecord is that the key and KeyGroup are selected and computed twice in both of the upstream and downstream operators currently. This unnecessary consumption could be even more if the keys of the stream do not change within a task. Savings can be made by passing the keyed information to downstream operators with some use cases that the key selection is complicated. But it could be another topic and this FLIP would not mention this too much.

One more change is that the key group are now computed and assigned statically, therefore, no setting method of the key group is provided in the KeyedStateBackend. 

Code Block
languagejava
titleKeyedStateBackend
public interface KeyedStateBackend<K>
        extends KeyedStateFactory, PriorityQueueSetFactory, Disposable {
	/**
     * Sets the current key and its corresponding keyGroup that are used for partitioned state.
     *
     * @param newKey The new current key.
     * @param keyGroup The keyGroup of added new key.
     */
	void setCurrentKeyAndKeyGroup(K newKey, int keyGroup);}

Rescaling

For a "pre-KeyedStream", the redistribution of keyed state of downstream operators of SourceTask depends on the source operator. Therefore, the mapping of Split to KeyGroup would be persistently restored and extracted for redistribution.

...