Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The input source stream is already partitioned in the external system.
    • For example, user data have already been grouped by the user ID in the external system.
  • The DataStream is already grouped by key previously with the keyBy().
    • For example, a record with three fields, (key, val_1, val_2), can be grouped first by the key with the keyBy() operation. Then, the data can be summed with val_1 and val_2, separately. 
    • The job can be written as
      • fromSource().keyBy().sum().reinterpretAsKeyedStream().sum()

However, one limitation is that for the first use case, if the input source stream is NOT partitioned exactly the same way Flink's keyBy would partition the data, the reinterpretation would cause the runtime error since KeyGroup mismatch and state inconsistency during rescaling.

...

Code Block
languagejava
titleDataStream
/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream operator and therefore
 * avoiding shuffling data and network consumption between two subtasks. If shuffling or network consumption is acceptable, 
 * {@link DataStream#keyBy(KeySelector)}
 * is recommended.
 *
 * <p>IMPORTANT: If the base stream is not inside the SourceTask, this method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The <li>Itbase stream has ever been partitioned with the {@link DataStream#keyBy(KeySelector)}.
 *     <li>Selected<li>The keysvalue of the selected keys of the base stream have not changed since {@link DataStream#keyBy(KeySelector)}.
 *     <li>No data shuffling since {@link DataStream#keyBy(KeySelector)}.
 * </ul>
 *
 * <p>Overall, the data stream has to be partitioned exactly in the same way as if it was
 * created through a {@link DataStream#keyBy(KeySelector)}.
 *
 * <p>IMPORTANT: If the base stream is inside the SourceTask, which means that the data stream from the external
 * system has been partitioned and so-called "pre-KeyedStream". This method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has been partitioned in the external system. But the stream does not need to be
 *     partitioned exactly in the same way Flink's keyBy would partition the data.
 *     <li>Selected keys of the base stream could only exist in one {@link SourceSplit}. If it exists in multiple
 *     {@link SourceSplit}s, the stream should not be called "pre-KeyedStream". And therefore use this method could
 *     cause state inconsistency.
 *     <li>The <li>Selectedvalue of selected keys of the base stream remain the same as the input source stream from the external system.
 * </ul>
 *
 * @param<p>IMPORTANT: keySelectorFor Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector) {}

/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream operator and therefore
 * avoiding shuffling data between two subtasks. If shuffling is acceptable, {@link DataStream#keyBy(KeySelector)}
 * is recommended.
 *
 * <p>IMPORTANT: If the base stream is not inside the SourceTask, this method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>It has ever been partitioned with the a "pre-KeyedStream", the maximum number of currently existing splits must not be larger than the maximum
 * number of key-groups. Since every split would be mapped to one key-group. To config the number of key-group, please config the
 * maximum supported parallelism.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector) {}

/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream operator and therefore
 * avoiding shuffling data between two subtasks. If shuffling is acceptable, {@link DataStream#keyBy(KeySelector)}.
 * is recommended.
 *
 * <li>Selected keys of<p>IMPORTANT: If the base stream haveis not changedinside sincethe {@link DataStream#keyBy(KeySelector)}.
 *     <li>No data shuffling since {@link DataStream#keyBy(KeySelector)}SourceTask, this method works well if the following conditions
 * are satisfied.
 * </ul>
 * <ul>
 *    <p>Overall, the<li>The database stream has toever bebeen partitioned exactly inwith the same way as if it was
 * created through a {@link DataStream#keyBy(KeySelector)}.
 *
 * <p>IMPORTANT: If the base<li>The streamvalue isof insideselected thekeys SourceTask,of whichthe meansbase thatstream thehave datanot streamchanged fromsince the external{@link DataStream#keyBy(KeySelector)}.
 * system  has been partitioned<li>No anddata so-called "pre-KeyedStream". This method works well if the following conditionsshuffling since {@link DataStream#keyBy(KeySelector)}.
 * are satisfied.</ul>
 *
 * <ul>
<p>Overall, *the     <li>The base data stream has to beenbe partitioned exactly in the externalsame system. But the stream does not need to beway as if it was
 * created through a {@link DataStream#keyBy(KeySelector)}.
 *
 * <p>IMPORTANT: If the base partitionedstream exactlyis ininside the sameSourceTask, waywhich Flink's keyBy would partitionmeans that the data.
 *stream from the external
 * <li>Selectedsystem keyshas ofbeen thepartitioned baseand stream could only exist in one {@link SourceSplit}. If it exists in multiple
 *     {@link SourceSplit}s, the stream should not be called "pre-KeyedStream". And therefore use this method could
 *     cause state inconsistency.so-called "pre-KeyedStream". This method works well if the following conditions
 * are satisfied.
 *
 * <ul>
 *     <li>The base stream has been partitioned in the external system. But the stream does not need to be
 *     <li>Selectedpartitioned keysexactly ofin the base stream remain the same asway theFlink's inputkeyBy sourcewould stream frompartition the external systemdata.
 * </ul>
 *
 * @param keySelector<li>Selected Functionkeys thatof definesthe howbase keysstream arecould extractedonly fromexist thein data stream.one {@link SourceSplit}. If it exists in multiple
 * @param typeInfo Explicit type information about{@link SourceSplit}s, the stream should keynot type.
be * @param <K> Type of the extracted keys.called "pre-KeyedStream". And therefore use this method could
 * @return   The reinterpretationcause ofstate the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {}

Proposed Changes

Before everything else, we need to give a definition for pre-KeyedStream.

  • The key of a record could and only could appear in one split in the lifecycle of a job. 
  • Pre-KeyedStream is saying that the external system has already partitioned the data stream w.r.t. the selected key.

There are two options for solving the problem introduced above.

Option 1. Re-define KeyGroupRangeAssignment

The KeyGroupRangeAssignment does two main things.

  • Assigns a key group for a key.
  • Assigns the key to a downstream operator based on its key group and the predefined downstream operator's key group range.

Since the input source stream is already grouped by key and the external system would assign the keys to one split probably in a different way from the Flink. For example, the first 100 keys seen are assigned to the first split. Therefore, we could try to substitute the key group assignment with a user-provided one to avoid the problem of the KeyGroup mismatch. Also, we have to make sure that the keys from different splits would not assign to the same key group since no 

It should be able to give us the following information, 

  • For the records from the same split, they should be assigned to the same key group.
  • For the records from different splits, they should be assigned to the different key groups.

Overall, the user has to provide the following mapping,

                                                        Record → Key → SplitID → KeyGroup

However, this could introduce the following problems.

inconsistency.
 *     <li>The value of selected keys of the base stream remain the same as the input source stream from the external system.
 * </ul>
 *
 * <p>IMPORTANT: For a "pre-KeyedStream", the maximum number of currently existing splits must not be larger than the maximum
 * number of key-groups. Since every split would be mapped to one key-group. To config the number of key-group, please config the
 * maximum supported parallelism.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param typeInfo Explicit type information about the key type.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {}

Usage Examples

Code Block
DataStream<Tuple<String, Integer, Integer>> input = fromSource(...);
// If the input source stream is not pre-KeyedStream
// Two aggregation operators would be chained together.
DataStream<Tuple<String, Integer, Integer>> stream = input.keyBy(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);
// If the input source stream is pre-KeyedStream
// The source operator and the fowllwing aggregation operators would be chained together. 
DataStream<Tuple<String, Integer, Integer>> stream = input.reinterpretAsKeyedStream(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);


Proposed Changes

There are two options for solving the problems introduced above.

Option 1. Re-define KeyGroupRangeAssignment

The KeyGroupRangeAssignment does two main things.

  • Assigns a key group for a key.
  • Assigns the key to a downstream operator based on its key group and the predefined downstream operator's key group range.

Since the input source stream is already grouped by key and the external system would assign the keys to one split probably in a different way from the Flink. For example, the first 100 keys seen are assigned to the first split. Therefore, we could try to replace the key group assignment with a user-provided one to avoid the problem of the KeyGroup mismatch. Also, we have to make sure that the keys from different splits would not assign to the same key group since, during rescaling, the state of the source operator has to keep pace with the keyed state of downstream operators.

Therefore, users need to provide the following information so that the downstream operators can maintain the keyed state correctly. 

  • For the records from the same split, they should be assigned to the same key group.
  • For the records from different splits, they should be assigned to the different key groups.

Overall, the user has to provide the following mapping for any record,

                                                        Record → Key → SplitID → KeyGroup

However, this could introduce the following problems.

  1. Users can only provide a function mapping from a record to a split. Since the key group should be transparent to the user, the mapping of Split to KeyGroup has to be established by the Flink.
    1. However, the paradox is that for a specific subtask, the key group range assignment is static while the split assignment is actually dynamic and irregular by different Sources. For example, the split assignment for Kafka Source is push-based, which would round-robin assigning the new split to source readers. Another example is that the split assignment for File Source is pull-based, which means that the split assignment is totally irregular. 
    2. Therefore, it is also hard for Flink to build such a mapping statically.
  2. The mapping of a key to a split could still be hard for the user to provide since it can be done automatically through an external system, e.g., Kafka.
  3. The maximum number of currently existing splits must not be larger than the maximum number of key groups. Because every split would be mapped to one key group. 
  4. Users can only provide a function mapping from a key to a split, because the key group should be transparent to the user. Therefore, the mapping of Split to KeyGroup has to be established by the Flink. However, the paradox is that for a specific subtask, the key group range assignment is static while the split assignment is actually dynamic and irregular to different Sources. For example, the split assignment for Kafka Source is push-based, which would round-robin assigning the new split to source readers. Another example is that the split assignment for File Source is pull-based, which means that the split assignment is irregular.
  5. The mapping of a key to a split could still be hard for the user to provide since it can be done automatically through an external system.
  6. The key group range needs to be as large as the number of currently existing splits. However, the number of currently existing splits could also be dynamic, while the number of key groups should be pre-defined before the job started. Therefore, users have to estimate the largest number of splits for a moment during the whole lifecycle of the job. For a pre-partitioned stream, the number of splits should be fixed to a small range or even a specific value, since the number of keys should be far smaller than the number of records in most scenarios.

Option 2. Assigned KeyGroup by the SourceOperator(Prefer one)

In this option, we would like to address the problem 1 & 2 introduced above. above. The mapping of a key to a split and a split to a key group can actually be established inside the Source Operator. 

  • The mapping of a key to a split could be omitted because it is obvious to the Source Operator.
  • The mapping of a

...

  • split to a

...

  • key group could also be established inside the Source Operator as well. Since the parallelism of source operators and downstream operators is equal, the KeyGroupRange of downstream operators could be inferred in the source operators. Therefore, one new split added to the source operator would then register for an unused key group. After that, all of the records from that split would be mapped to the registered key group.

As a result, the mapping of a key to a KeyGroup could

...

be established inside the Source Operator

...

, but the problem is that we have to pass down the key group of a record to the downstream operators with the record itself together within the SourceTask. One solution is that we can create a new class called KeyedStreamRecord.

Code Block
languagejava
titleKeyedStreamRecord
/**
 * A stream record containing the keyed information.
 *
 * @param <T> The type encapsulated with the stream record.
 */
@Internal
public class KeyedStreamRecord<T> extends StreamRecord<T> {
	private int keyGroup;
}

As a result, the mapping of a key to a KeyGroup could be established inside the Source Operator, but the problem is that we have to pass down the key group of a record to the downstream operator with the record itself together. But I believe that this is acceptable to the user since the source operators and downstream operators and now chaining together, which means that although memory usage could increase fo a bit, the network consumption is saved.

Rescaling

...

For a "pre-KeyedStream", the redistribution of keyed state of downstream operators of SourceTask depends on the source operator. Therefore, the mapping of Split to KeyGroup would be persistently restored and extracted for redistribution.

Conclusion

...


Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

...