Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

According to the "Flink-8571 Provide an enhanced KeyedStream implementation to use ForwardPartitioner", the DataStream could be reinterpreted as KeyedStream with ForwardPartitioner with the following two use cases.

  • The input source stream is already partitioned in the external system.
    • For example, user data have already been grouped by the user ID in the external system.
  • The DataStream is already grouped by key previously with the keyBy().
    • For example, a record with three fields, (key, val_1, val_2), can be grouped first by the key with the keyBy() operation. Then, the data can be summed with val_1 and val_2, separately. 
  • Some use cases in the ML are case 1case 2, case 3.

However, one limitation is that for the first use case, if the input source stream is NOT partitioned exactly the same way Flink's keyBy would partition the data, the reinterpretation would cause the runtime error since KeyGroup mismatch.


In this example, we can see that according to the hash function used by the Flink, the key group of key_0 is 64. However, it is forwarded to the downstream operator, who only takes responsibility for key groups from 0 to 63, not including 64. The downstream operator would then try to extract and update the state of KeyGroup 64, which it doesn't take responsibility for. Therefore, a runtime error would be triggered.


This FLIP would like to address this problem and maybe we can introduce the reinterpretAsKeyedStream() to the DataStream formally so that more and more users can be aware of this API.

Public Interfaces

DataStream
/**
 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with
 * the given {@link KeySelector}.
 *
 * <p>IMPORTANT: The precondition is that the base stream has either been partitioned by 
 * {@link DataStream#keyBy(KeySelector)} or it is a "Pre-KeyedStream". Besides, the selected key
 * and partitioning of base stream should have remained unchanged before this transformation.
 *
 * <p>"Pre-KeyedStream": The base stream is streaming from the external system, where it has
 * been grouped by key and partitioned base on {@link SourceSplit}.
 *
 * <p>For a "Pre-KeyedStream", the maximum number of currently existing splits must
 * not be larger than the maximum number of key-groups. Since every split would be mapped to one
 * key-group. To config the number of key-group, please config the maximum supported parallelism.
 *
 * <p>IMPORTANT: For a "Pre-KeyedStream", if chaining is disabled for debugging, this method
 * might not work.
 *
 * <p>With the reinterpretation, the upstream operators can now be chained with the downstream
 * operator and therefore avoiding shuffling data and network consumption between two subtasks.
 * If shuffling or network consumption is acceptable, {@link DataStream#keyBy(KeySelector)} is
 * recommended.
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector) {}

/**
 * @see #reinterpretAsKeyedStream(KeySelector)
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param typeInfo Explicit type information about the key type.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, TypeInformation<K> typeInfo) {}

Usage Examples

DataStream<Tuple<String, Integer, Integer>> input = fromSource(...);
// If the input source stream is not pre-KeyedStream
// Two aggregation operators would be chained together by default.
DataStream<Tuple<String, Integer, Integer>> stream = input.keyBy(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);
// If the input source stream is pre-KeyedStream
// The source operator and the fowllwing aggregation operators would be chained together by default. 
DataStream<Tuple<String, Integer, Integer>> stream = input.reinterpretAsKeyedStream(value -> value.f0)
														  .sum(1)
														  .reinterpretAsKeyedStream(value -> value.f0)
														  .sum(2);

Proposed Changes

Before everything else, let's give a definition to the "pre-KeyedStream".

  • The input source stream has been partitioned into different "SourceSplits", e.g., files, partitions, etc., at the external system.
  • The selected key of the input source stream could only exist in one "SourceSplit".

There are two options for solving the problem introduced above.

"Since the problem is caused by pre-KeyedStream, all of the following proposed discussion and modification would be narrowed down in the SourceTask only."

Option 1. Re-define KeyGroupRangeAssignment

The KeyGroupRangeAssignment does two main things.

  • Assigns a key group for a key.
  • Assigns the key to a downstream operator based on its key group and the predefined downstream operator's key group range.

Since the input source stream is already grouped by key and the external system would assign the keys to one split probably in a different way from the Flink. Therefore, we could try to replace the key group assignment with a user-provided one to avoid the problem of the KeyGroup mismatch introduced above. Also, we have to make sure that the keys from different splits would not assign to the same key group since, during rescaling, the state of the source operator has to keep pace with the keyed state of downstream operators.

Therefore, users need to provide the following information so that the downstream operators can maintain the keyed state correctly. 

  • For the records from the same split, they should be assigned to the same key group.
  • For the records from different splits, they should be assigned to the different key groups.

Overall, the user has to provide the following mapping for any record,

                                                        Record → Key → SplitID → KeyGroup

As we know, the KeyGroupRange of each operator is computed statically before the job is executed, while the split is found and assigned dynamically during running. The split assignment could be push-based or pull-based depends on different connectors. Therefore, the mapping from Split to KeyGroup should be dynamic, which means that the user cannot provide Flink a mapping from Split to KeyGroup. The mapping has to build and maintain by Flink during running.

Therefore, the Flink should maintain a new "Assigner", which takes responsibility for the following few things.

  1. Whenever a split is added or removed from the SourceOperator, the "Assigner" needs to build a new mapping of the new split to unused KeyGroup or delete the corresponding mapping for the removed split.
  2. Giving a Key, firstly computing the splitID it belongs to with the function provided by the user. Then returning the KeyGroup the splitID maps to.

After all, the functions users need to provide are,

                                                        Record → Key → SplitID

  1. The mapping from a record to its key could be done by KeySelector, which is necessary for keyed state backend to maintain the state.
  2. The mapping from a key to the SplitID is necessary for "Assigner" to assign the KeyGroup.

Therefore, the API could be,

DataStream
/**
 * ...
 * 
 *
 * @param keySelector Function that defines how keys are extracted from the data stream.
 * @param splitSelector Function that defines how a key is mapping to the splitID it is partitioned to.
 * @param <K> Type of the extracted keys.
 * @return The reinterpretation of the {@link DataStream} as a {@link KeyedStream}.
 */
public <K> KeyedStream<T, K> reinterpretAsKeyedStream(KeySelector<T, K> keySelector, SplitSelector<K, String> splitSelector) {}

However, the mapping from a record or a key to the SplitID could be obscure to the user. For example, the data could be computed in Flink, then streaming to the Flink through an MQ, e.g., Kafka. In this example, neither the partitioner nor the splitID is a public API for the users. This could be common under many scenarios. Therefore, it is hard and possibly meaningless for users to provide the "SplitSelector".



Option 2. Assign KeyGroup by the SourceOperator (Prefer one)

In this option, we would like to address the problems introduced above. The mapping of a key to a split and a split to a key group can actually be established inside the Source Operator. 

  • The mapping of a key to a split could be omitted because it is obvious to the Source Operator.
  • The mapping of a split to a key group could also be established inside the Source Operator as well. Since the parallelism of source operators and downstream operators is equal, the KeyGroupRange of downstream operators could be inferred in the source operators. Therefore, one new split added to the source operator would then register for an unused key group. After that, all of the records from that split would be mapped to the registered key group.

As a result, the mapping of a key to a KeyGroup could be established inside the Source Operator, but the problem is that we have to pass down the key group of a record to the downstream operators with the record itself together. One solution is that we can create a new class called KeyedStreamRecord, which wraps the StreamRecord with a new field, KeyGroup.

KeyedStreamRecord
/**
 * A stream record containing the keyed information.
 *
 * @param <T> The type encapsulated with the stream record.
 */
@Internal
public class KeyedStreamRecord<T> extends StreamRecord<T> {
	private int keyGroup;
}

But I believe that this is acceptable to the user since the source operators and downstream operators and now chaining together, which means that although memory usage could increase fo a bit, the network consumption is saved.

Another motivation of such wrapped KeyedStreamRecord is that the key and KeyGroup are selected and computed twice in both of the upstream and downstream operators currently. Once for selecting a downstream channel in the upstream operator and once for selecting the partitioned state in the downstream operator. Savings can be made by passing the keyed information to downstream operators. In this scenario, the downstream operators have no need to hash out the KeyGroup of a key since the information is containing in the record. In the future, when users utilize the KeyedStream by no matter reinterpretAsKeyedStream or keyBy, we could try to reduce such duplicated selection and computation by the KeyedStreamRecord. Since the load of CPU is usually the bottleneck of a job comparing with the network consumption.

But this is another topic and in this FLIP, the KeyedStreamRecord would only be implemented in the SourceTask for addressing the limitation above.

One more modification is that because key groups are previously statically computed and allocated, there is no set method of a key group in KeyedStateBackend and now it is added in the FLIP.

KeyedStateBackend
public interface KeyedStateBackend<K>
        extends KeyedStateFactory, PriorityQueueSetFactory, Disposable {
	/**
     * Sets the current key and its corresponding KeyGroup that is used for partitioned state.
     *
     * @param newKey The new current key.
     * @param keyGroup The KeyGroup of added new key.
     */
	void setCurrentKeyAndKeyGroup(K newKey, int keyGroup);}

Overall, comparing with hashing the key of a record and getting its key group, the key group would now be allocated by SourceOperator if the input stream is "pre-KeyedStream". The assignment of the key group would be propagated among the operators in the SourceTask. Because in general, the partitioning between two tasks is All-to-All, which leads to data shuffle. Therefore, the downstream operator that still wants to take advantage of KeyedStream's features has to repartition the data stream through keyBy().

Rescaling

For a "pre-KeyedStream", the redistribution of keyed state of downstream operators of SourceTask depends on the source operator. Therefore, the mapping of Split to KeyGroup would be persistently restored and extracted for redistribution. 

Conclusion

For option 1, one obvious pro is the implementation could be simple. Only the "KeyGroupRangeAssignment#assignToKeyGroup" needs to be replaced with a dynamic one. 

The con is that the users need to provide a mapping from a key to splitID and the mapping is usually not a public API for the external systems, e.g., Flink.

For option 2, it is just the opposite to option 1. The pros are as follow,

  • The users need not provide other information except for KeySelector. It is keeping clean for users to use this API. 
  • The wrapping StreamRecord could help to reduce duplicated extraction and computation of key and KeyGroup.

The con is that the implementation could be complicated and sensitive since wrapping the StreamRecord with a new field. But I believe it is acceptable since the modification is limited in the SourceTask. 

For these two options, there is a common limitation, which is the number of splits should not be larger than the number of key groups. Because finally, we want to build a mapping from a split to a key group to ensure correctness during rescaling.

Compatibility, Deprecation, and Migration Plan

This improvement will not impact the current users.

Test Plan

Additional unit tests will be added to test if the API works well when the input streaming is "Pre-KeyedStream". 

Rejected Alternatives

No option rejected yet.