Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The input source stream is already partitioned in the external system.
    • For example, user data have already been grouped by the user ID in the external system.
  • The DataStream is already grouped by key previously with the keyBy().
    • For example, a record with three fields, (key, val_1, val_2), can be grouped first by the key with the keyBy() operation. Then, the data can be summed with val_1 and val_2, separately. 
    • A use case in the user ML is case 1.

However, one limitation is that for the first use case, if the input source stream is NOT partitioned exactly the same way Flink's keyBy would partition the data, the reinterpretation would cause the runtime error since KeyGroup mismatch and state inconsistency during rescaling.

...

.

...


In this example, we can see that according to the hash function provided used by the Flink, the key group of key_0 is 64. However, it is forwarded to the downstream operator, who only takes responsibility for key groups from 0 to 63, but not including 64. The downstream operator would then try to extract and update the state of KeyGroup 64, which it doesn't take responsibility for. Therefore, a runtime error would be triggered.

Example 2. State Inconsistency

Image Removed

In this example, the splitA contains the keys that assign to the key group from 0 to 10, the splitB contains the keys that assign to the key group from 11 to 63, the splitC contains the keys that assign to the key group from 64 to 127. During rescaling, the redistribution of Operator State and Keyed State is independent, which means that there is no guarantee that the state could still remain consistent after rescaling.

This FLIP would like to address these problems and introduce the reinterpretAsKeyedStream() to the DataStream formally.

Public Interfaces

...


This FLIP would like to address this problem and introduce the reinterpretAsKeyedStream() to the DataStream formally.

Public Interfaces

Anchor
reinterpretAsKeyedStream
reinterpretAsKeyedStream

Code Block
languagejava
titleDataStream
/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream operator and therefore
 * avoiding shuffling data and network consumption between two subtasks. If shuffling or network consumption is acceptable, 
 * {@link DataStream#keyBy(KeySelector)} is recommended.
 *
 * <p>IMPORTANT: If the base stream is not in the SourceTask, this method works well if the following conditions
 * are satisfied

...

Code Block
languagejava
titleDataStream
/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With<ul>
 the* reinterpretation, the upstream operators can<li>The now be chainedbase stream has ever been partitioned with the downstream operator and therefore{@link DataStream#keyBy(KeySelector)}.
 * avoiding shuffling data and network<li>The consumptionvalue betweenof twoselected subtasks.keys Ifof shufflingthe orbase networkstream consumptionhave isnot acceptable,changed 
since * {@link DataStream#keyBy(KeySelector)} is recommended.
 *
 * <p>IMPORTANT: If the base<li>No streamdata isshuffling notsince inside the SourceTask, this method works well if the following conditions{@link DataStream#keyBy(KeySelector)}.
 * are satisfied.</ul>
 *
 * <ul>
<p>Overall, *the data stream has to <li>The base stream has ever been partitioned with thebe partitioned exactly in the same way as if it was
 * created through a {@link DataStream#keyBy(KeySelector)}.
 *
 * <p>IMPORTANT: If the base stream is <li>Thein valuethe ofSourceTask, selectedwhich keysmeans ofthat the basedata stream havefrom notthe changed since {@link DataStream#keyBy(KeySelector)}.
 *     <li>No data shuffling since {@link DataStream#keyBy(KeySelector)}.
 * </ul>
 *
 * <p>Overall, the data stream has to be partitioned exactlyexternal
 * system has been partitioned and so-called "pre-KeyedStream". This method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has been partitioned in the sameexternal waysystem. asBut ifthe itstream was
does *not createdneed through a {@link DataStream#keyBy(KeySelector)}.to be
 *
 * <p>IMPORTANT: If the basepartitioned streamexactly isin inside the SourceTask,same whichway meansFlink's thatkeyBy the data stream fromwould partition the externaldata.
 * system has been partitioned and<li>Selected so-called "pre-KeyedStream". This method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has been partitioned in the external system. But the stream does not need to bekeys of the base stream could only exist in one {@link SourceSplit}. If it exists in multiple
 *     {@link SourceSplit}s, the stream should not be called "pre-KeyedStream". And therefore use this method could
 *     cause state inconsistency.
 *     partitioned exactly in<li>The value of selected keys of the base stream remain the same as waythe Flink'sinput keyBysource wouldstream partitionfrom the external datasystem.
 * </ul>
 *
 * <p>IMPORTANT: <li>SelectedFor keys ofa "pre-KeyedStream", the basemaximum streamnumber couldof onlycurrently existexisting insplits onemust {@link SourceSplit}. If it exists in multiplenot be larger than the maximum
 * number of key-groups. Since {@link SourceSplit}s, the stream should not be called "pre-KeyedStream". And therefore use this method could
 *     cause state inconsistency.
 *     <li>The value of selected keys of the base stream remain the same as the input source stream from the external system.
 * </ul>
 *
 * <p>IMPORTANT: For a "pre-KeyedStream", the maximum number of currently existing splits must not be larger than the maximum
 * number of key-groups. Since every split would be mapped to one key-group. To config the number of key-group, please config the
 * maximum supported parallelism.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector) {}

/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream operator and therefore
 * avoiding shuffling data between two subtasks. If shuffling is acceptable,every split would be mapped to one key-group. To config the number of key-group, please config the
 * maximum supported parallelism.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector) {}

/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream operator and therefore
 * avoiding shuffling data and network consumption between two subtasks. If shuffling or network consumption is acceptable, 
 * {@link DataStream#keyBy(KeySelector)} is recommended.
 *
 * <p>IMPORTANT: If the base stream is not in the SourceTask, this method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has ever been partitioned with the {@link DataStream#keyBy(KeySelector)}.
 *     <li>The value of selected keys of the base stream have not changed since {@link DataStream#keyBy(KeySelector)}.
 *     <li>No data shuffling since {@link DataStream#keyBy(KeySelector)}.
 * is recommended.</ul>
 *
 * <p>IMPORTANT: If<p>Overall, the basedata stream ishas to notbe insidepartitioned theexactly SourceTask,in thisthe methodsame worksway wellas if theit following conditionswas
 * arecreated satisfied.
 *
 * <ul>through a {@link DataStream#keyBy(KeySelector)}.
 *
 * <p>IMPORTANT: If  <li>Thethe base stream hasis everin beenthe partitionedSourceTask, withwhich themeans {@link DataStream#keyBy(KeySelector)}.
 *     <li>The value of selected keys of the base stream have not changed since {@link DataStream#keyBy(KeySelector)}.
 *     <li>No data shuffling since {@link DataStream#keyBy(KeySelector)}.
 * </ul>
 *
 * <p>Overall, the data stream has to be partitioned exactly in the same way as if it was
 * created through a {@link DataStream#keyBy(KeySelector)}.
 *
 * <p>IMPORTANT: If the base stream is inside the SourceTask, which means that the data stream from the external
 * system has been partitioned and so-called "pre-KeyedStream". This method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has been partitioned in the external system. But the stream does not need to bethat the data stream from the external
 * system has been partitioned and so-called "pre-KeyedStream". This method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has been partitioned in the external system. But the stream does not need to be
 *     partitioned exactly in the same way Flink's keyBy would partition the data.
 *     <li>Selected keys of the base stream could only exist in one {@link SourceSplit}. If it exists in multiple
 *     {@link SourceSplit}s, the stream should not be called "pre-KeyedStream". And therefore use this method could
 *     cause state inconsistency.
 *     partitioned exactly in<li>The value of selected keys of the base stream remain the same as waythe Flink'sinput keyBysource wouldstream partitionfrom the external datasystem.
 * </ul>
 *
 * <p>IMPORTANT: <li>SelectedFor keys ofa "pre-KeyedStream", the basemaximum streamnumber couldof onlycurrently existexisting insplits onemust {@link SourceSplit}. If it exists in multiplenot be larger than the maximum
 * number of key-groups. Since {@link SourceSplit}s, the stream should not be called "pre-KeyedStream". And therefore use this method could
 *     cause state inconsistency.
 *     <li>The value of selected keys of the base stream remain the same as the input source stream from the external system.
 * </ul>
 *
 * <p>IMPORTANT: For a "pre-KeyedStream", the maximum number of currently existing splits must not be larger than the maximum
 * number of key-groups. Since every split would be mapped to one key-group. To config the number of key-group, please config the
 * maximum supported parallelism.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param typeInfo Explicit type information about the key type.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {}

Usage Examples

Code Block
DataStream<Tuple<String, Integer, Integer>> input = fromSource(...);
// If the input source stream is not pre-KeyedStream
// Two aggregation operators would be chained together.
DataStream<Tuple<String, Integer, Integer>> stream = input.keyBy(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);
// If the input source stream is pre-KeyedStream
// The source operator and the fowllwing aggregation operators would be chained together. 
DataStream<Tuple<String, Integer, Integer>> stream = input.reinterpretAsKeyedStream(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);

Proposed Changes

There are two options for solving the problems introduced above.

Option 1. Re-define KeyGroupRangeAssignment

The KeyGroupRangeAssignment does two main things.

  • Assigns a key group for a key.
  • Assigns the key to a downstream operator based on its key group and the predefined downstream operator's key group range.
every split would be mapped to one key-group. To config the number of key-group, please config the
 * maximum supported parallelism.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {}

Usage Examples

Code Block
DataStream<Tuple<String, Integer, Integer>> input = fromSource(...);
// If the input source stream is not pre-KeyedStream
// Two aggregation operators would be chained together.
DataStream<Tuple<String, Integer, Integer>> stream = input.keyBy(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);
// If the input source stream is pre-KeyedStream
// The source operator and the fowllwing aggregation operators would be chained together. 
DataStream<Tuple<String, Integer, Integer>> stream = input.reinterpretAsKeyedStream(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);

Proposed Changes

There are two options for solving the problem introduced above.

Option 1. Re-define KeyGroupRangeAssignment

The KeyGroupRangeAssignment does two main things.

  • Assigns a key group for a key.
  • Assigns the key to a downstream operator based on its key group and the predefined downstream operator's key group range.

Since the input source stream is already grouped by key and the external system would assign the keys to one split probably in a different way from the Flink. For example, the first 100 keys seen are assigned to the first split. Therefore, we could try to replace the key group assignment with a user-provided one to avoid the problem Since the input source stream is already grouped by key and the external system would assign the keys to one split probably in a different way from the Flink. For example, the first 100 keys seen are assigned to the first split. Therefore, we could try to replace the key group assignment with a user-provided one to avoid the problem of the KeyGroup mismatch. Also, we have to make sure that the keys from different splits would not assign to the same key group since, during rescaling, the state of the source operator has to keep pace with the keyed state of downstream operators.

...

  1. Users can only provide a function mapping from a record to a split. Since the key group should be transparent to the user, the mapping of Split to KeyGroup has to be established by the Flink.
    1. However, the paradox is that for a specific subtask, the key group range assignment is static while the split assignment is actually dynamic and irregular by different Sources. For example, the split assignment for Kafka Source is push-based, which would round-robin assigning the new split to source readers. Another example is that the split assignment for File Source is pull-based, which means that the split assignment is totally irregular. 
    2. Therefore, it is also hard for Flink to build such a mapping statically.
  2. The mapping of a key to a split could still be hard for the user to provide since it can be done automatically through an external system, e.g., Kafka.
  3. The maximum number of currently existing splits must not be larger than the maximum number of key groups. Because every split would be mapped to one key group. 

Option 2. Assigned KeyGroup by the SourceOperator(Prefer one)

...

Another motivation of such wrapped KeyedStreamRecord is that the key and KeyGroup are selected and computed twice in both of the upstream and downstream operators currently. Once for selecting a downstream channel in the upstream operator and once for selecting the partitioned state in the downstream operator. Savings can be made by passing the keyed information to downstream operators with some use cases that the key selection is complicated. But it's another topic and this FLIP would not discuss this too much.

...

Code Block
languagejava
titleKeyedStateBackend
public interface KeyedStateBackend<K>
        extends KeyedStateFactory, PriorityQueueSetFactory, Disposable {
	/**
     * Sets the current key and its corresponding keyGroup that are used for partitioned state.
     *
     * @param newKey The new current key.
     * @param keyGroup The keyGroup of added new key.
     */
	void setCurrentKeyAndKeyGroup(K newKey, int keyGroup);}

Rescaling

For a "pre-KeyedStream", the redistribution of keyed state of downstream operators of SourceTask depends on the source operator. Therefore, the mapping of Split to KeyGroup would be persistently restored and extracted for redistribution.

Conclusion

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

;}

Rescaling

For a "pre-KeyedStream", the redistribution of keyed state of downstream operators of SourceTask depends on the source operator. Therefore, the mapping of Split to KeyGroup would be persistently restored and extracted for redistribution.

Conclusion