Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Motivation

According to the "Flink-8571 Provide an enhanced KeyedStream implementation to use ForwardPartitioner", the DataStream could be reinterpreted as KeyedStream with ForwardPartitioner with the following two use cases.

...

However, one limitation is that for the first use case, if the input source stream is NOT partitioned exactly the same way Flink's keyBy would partition the data, the reinterpretation would cause the runtime error since KeyGroup mismatch and state inconsistency during rescaling.

Example 1. KeyGroup Mismatch


For In this example, we can see that according to the hash function provided by the Flink, the key group of key_0 is 64. However, it is forwarded to the downstream operator, who only takes responsibility for key groups from 0 to 63, but not including 64. The downstream operator would then try to extract and update the state of KeyGroup 64, which it doesn't take responsibility for. Therefore, a runtime error would be triggered.

Example 2. State Inconsistency



In this example, the splitA contains the keys that assign to the key group from 0 to 10, the splitB contains the keys that assign to the key group from 11 to 63, the splitC contains the keys that assign to the key group from 64 to 127. During rescaling, the redistribution of Operator State and Keyed State is independent, which means that there is no guarantee that the state could still remain consistent after rescaling.


This FLIP would like to address these problems and introduce the reinterpretAsKeyedStream() to the DataStream formally.


Public Interfaces

Code Block
languagejava
titleDataStream
/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream operator and therefore
 * avoiding shuffling data between two subtasks. If shuffling is acceptable, {@link DataStream#keyBy(KeySelector)}
 * is recommended.
 *
 * <p>IMPORTANT: If the base stream is not inside the SourceTask, this method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>It has ever been partitioned with the {@link DataStream#keyBy(KeySelector)}.
 *     <li>Selected keys of the base stream have not changed since {@link DataStream#keyBy(KeySelector)}.
 *     <li>No data shuffling since {@link DataStream#keyBy(KeySelector)}.
 * </ul>
 *
 * <p>Overall, the data stream has to be partitioned exactly in the same way as if it was
 * created through a {@link DataStream#keyBy(KeySelector)}.
 *
 * <p>IMPORTANT: If the base stream is inside the SourceTask, which means that the data stream from the external
 * system has been partitioned and so-called "pre-KeyedStream". This method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has been partitioned in the external system. But the stream does not need to be
 *     partitioned exactly in the same way Flink's keyBy would partition the data.
 *     <li>Selected keys of the base stream could only exist in one {@link SourceSplit}. If it exists in multiple
 *     {@link SourceSplit}s, the stream should not be called "pre-KeyedStream". And therefore use this method could
 *     cause state inconsistency.
 *     <li>Selected keys of the base stream remain the same as the input source stream from the external system.
 * </ul>
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector) {}

/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream operator and therefore
 * avoiding shuffling data between two subtasks. If shuffling is acceptable, {@link DataStream#keyBy(KeySelector)}
 * is recommended.
 *
 * <p>IMPORTANT: If the base stream is not inside the SourceTask, this method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>It has ever been partitioned with the {@link DataStream#keyBy(KeySelector)}.
 *     <li>Selected keys of the base stream have not changed since {@link DataStream#keyBy(KeySelector)}.
 *     <li>No data shuffling since {@link DataStream#keyBy(KeySelector)}.
 * </ul>
 *
 * <p>Overall, the data stream has to be partitioned exactly in the same way as if it was
 * created through a {@link DataStream#keyBy(KeySelector)}.
 *
 * <p>IMPORTANT: If the base stream is inside the SourceTask, which means that the data stream from the external
 * system has been partitioned and so-called "pre-KeyedStream". This method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has been partitioned in the external system. But the stream does not need to be
 *     partitioned exactly in the same way Flink's keyBy would partition the data.
 *     <li>Selected keys of the base stream could only exist in one {@link SourceSplit}. If it exists in multiple
 *     {@link SourceSplit}s, the stream should not be called "pre-KeyedStream". And therefore use this method could
 *     cause state inconsistency.
 *     <li>Selected keys of the base stream remain the same as the input source stream from the external system.
 * </ul>
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param typeInfo Explicit type information about the key type.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {}

Proposed Changes

Before everything else, we need to give a definition for pre-KeyedStream.

...

There are two options for solving the limitationproblem introduced above.

Option 1. Re-define KeyGroupRangeAssignment

The KeyGroupRangeAssignment would do does two main things.

  • Assigns a key group for a key.
  • Assigns the key to a downstream operator based on its key group and the predefined downstream operator's key group range.

Since the input source stream is already grouped by key and the external system would assign the keys to splits(similar to the key groups) one split probably in a different way from the Flink. For example, the first 100 keys seen are assigned to the first split. Therefore, we need could try to substitute the key group assignment with a user-provided one to avoid the problem of the KeyGroup mismatch. Also, we have to make sure that the keys from different splits would not assign to the same key group since no 


It should be able to give us the following information, 

...

  1. Users can only provide a function mapping from a key to a split, but because the key group should be transparent to the user. Therefore, the mapping of a split to a key group should Split to KeyGroup has to be established by the Flink itself since the mapping is dynamic.However, . However, the paradox is that for a specific subtask, the key group range assignment is static while the split assignment is hard to represent through a function for different sources. For actually dynamic and irregular to different Sources. For example, the split assignment for Kafka Source is push-based, which would round-robin assigning the new split to source readers. However, Another example is that the split assignment for File Source is pull-based, which means that the split assignment is irregular. Therefore, the mapping of a split to a key group is dynamic.
  2. The mapping of a key to a split could still be hard for the user to provide since it can be done automatically through an external system.
  3. The key group range needs to be as large as the number of currently existing splits. However, the number of currently existing splits could also be dynamic, while the number of key groups should be pre-defined before the job started. Therefore, users have to estimate the largest number of splits for a moment during the whole lifecycle of the job. For a pre-partitioned stream, the number of splits should be fixed to a small range or even a specific value, since the number of keys should be far smaller than the number of records in most scenarios.

Option 2. Assigned KeyGroup by the SourceOperator(

...

Prefer one)

In this option, we would like to address the problem 1 & 2 introduced above. The mapping of a key to a split and a split to a key group can actually be established inside the Source Operator. 

...