Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The specific transformation relationship is shown in the following table:

Partitioning

Output

Global

Keyed

NonKeyed

Broadcast

Input

Global

KeyBy

Shuffle

Broadcast

Keyed

Global

KeyBy

Shuffle

Broadcast

NonKeyed

Global

KeyBy

Shuffle

Broadcast

Broadcast

(A crossed box indicates that it is not supported or not required)

...

According to the number of input / output, they are classified as follows:

Process Function

number of inputs

number of outputs

OneInputStreamProcessFunction

1

1

TwoInputStreamProcessFunction

2

1

TwoOutputStreamProcessFunction

1

2

Logically, process functions that support more inputs and outputs can be achieved by combining them, but this implementation might be inefficient. If the call for this becomes louder, we will consider supporting as many output edges as we want through a mechanism like OutputTag. But this loses the explicit generic type information that comes with using ProcessFunction.

...

For OneInputStreamProcessFunction:

Input Stream

Output Stream

Global

Global

Keyed

Keyed / Non-Keyed

NonKeyed

NonKeyed

Broadcast

Not Supported

When KeyedPartitionStream is used as input, the output can be either a KeyedPartitionStream or NonKeyedPartitionStream. For general data processing logic, how to partition data is uncertain, we can only expect a NonKeyedPartitionStream. If we do need a deterministic partition, we can follow it with a KeyBy partitioning. However, there are times when we know for sure that the partition of records will not change before and after processing, shuffle cost due to the extra partitioning can be avoided. To be safe, in this case we ask for a KeySelector for the output data, and the framework checks at runtime to see if this invariant is broken. The same is true for two output and two input counterparts. For a more detailed explanation, see the API definition of KeyedPartitionStream in the Proposed Changes section below.

...

For TwoOutputStreamProcessFunction:

Input Stream

Output Stream

Global

Global  + Global

Keyed

Keyed + Keyed / Non-Keyed + Non-Keyed

NonKeyed

NonKeyed + NonKeyed

Broadcast

Not Supported

There are two points to note here:

...

A cross(❎) indicates not supported.

Output

Input2

Global

Keyed

NonKeyed

Broadcast

Input1

Global

Global

Keyed

Non-Keyed / Keyed

Non-Keyed

NonKeyed

Non-Keyed

Non-Keyed

Broadcast

Non-Keyed

Non-Keyed

  1. The reason why the connection between Global Stream and Non-Global Stream is not supported is that the number of partitions of GlobalStream is forced to be 1, but it is generally not 1 for Non-Global Stream, which will cause conflicts when determining the number of partitions of the output stream. If necessary, they should be transformed into mutually compatible streams and then connected.
  2. Connecting two broadcast streams doesn't really make sense, because each parallelism would have exactly same input data from both streams and any process would be duplicated. 
  3. The reason why the output of two keyed partition streams can be keyed or non-keyed is the same as we mentioned above in the case of single input.
  4. When we connect two KeyedPartitionStream, they must have the same key type, otherwise we can't decide how to merge the partitions of the two streams. At the same time, things like access state and register timer are also restricted to the partition itself, cross-partition interaction is not meaningful.

  5. The reasons why the connection between KeyedPartitionStream and NonKeyedPartitionStream is not supported are as follows:
    1. The data on KeyedStream is deterministic, but on NonKeyed is not. It is difficult to think of a scenario where the two need to be connected.
    2. This will complicate the state declaration and access rules. A more detailed discussion can be seen in the subsequent state-related sub-FLIP.
    3. If we see that most people have clear demands for this, we can support it in the future.

...

RuntimeContext contains information about the context in which process functions are executed. It is currently just an empty interface but will be expanded later, such as supporting access to state, registering timers, etc. This part will be elaborated in the subsequent sub-FLIPs.

...

We can see that each process function provides the life-cycle hook for endInput. The runtime engine will call back this method after processing all data of this input, providing the final opportunity to send data to downstream. This is crucial for implementing something like full-aggregation window.

DataStreams

In general, we will expose 4 types of DataStream interfaces to users, partitioning and process can be applied to these data streams.

NonKeyedPartitionedStream

Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, each parallelism is a
 * partition, and the partition to which the data belongs is random.
 */
public interface NonKeyedPartitionStream<T> {
    /**
     * Apply an operation to this {@link NonKeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link NonKeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply to a two input operation on this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> coalesce();

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link NonKeyedPartitionStream}. It will be used
     * as the return value of operation with two output.
     */
    interface TwoNonKeyedPartitionStreams<T1, T2> {
        /** Get the first stream. */
        NonKeyedPartitionStream<T1> getFirst();

        /** Get the second stream. */
        NonKeyedPartitionStream<T2> getSecond();
    }
}

KeyedPartitionedStream

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, Each key group is a
 * partition, and the partition to which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> {
    /**
     * Apply an operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector} on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <OUT> KeyedPartitionStream<K, OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply an operation to this {@link KeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, these new two {@link KeySelector}s must extract the same key as the
     * original {@link KeySelector}s on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @param keySelector1 to select the key of first output.
     * @param keySelector2 to select the key of second output.
     * @return new {@link TwoKeyedPartitionStream} with this operation.
     */
    <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction,
            KeySelector<OUT1, K> keySelector1,
            KeySelector<OUT2, K> keySelector2);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new {@link TwoNonKeyedPartitionStream} with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}. The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector}s on these two {@link KeyedPartitionStream}s.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> coalesce();

    /**
     * Transform this stream to a new {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link KeyedPartitionStream}. It will be used as
     * the return value of operation with two output.
     */
    interface TwoKeyedPartitionStreams<K, T1, T2> {
        /** Get the first stream. */
        KeyedPartitionStream<K, T1> getFirst();

        /** Get the second stream. */
        KeyedPartitionStream<K, T2> getSecond();
    }
}

GlobalStream

Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> {
    /**
     * Apply an operation to this {@link GlobalStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> GlobalStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link GlobalStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoGlobalStream<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link GlobalStream}.
     *
     * @param other {@link GlobalStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
            GlobalStream<T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link GlobalStream}. It will be used as the
     * return value of operation with two output.
     */
    interface TwoGlobalStream<T1, T2> {
        /** Get the first stream. */
        GlobalStream<T1> getFirst();

        /** Get the second stream. */
        GlobalStream<T2> getSecond();
    }
}

BroadcastStream

Code Block
languagejava
titleBroadcastStream.java
/** This class represents a stream that each parallel task processes the same data. */
public interface BroadcastStream<T> {
    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);
}

Similarly to source, we only supports sinkV2 based sink.

...