Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Sometimes it is not possible to decide on the current partition in the context of the processing. For example, when processing the input records from the broadcast edge. Therefore, we introduce a mechanism to process all partitions.

Note: RuntimeContext contains information about the context in which process functions are executed. It is currently just an empty interface but will be expanded later, such as supporting access to state, registering timers, etc. This part will be elaborated in the subsequent sub-FLIPs.

Code Block
languagejava
titleCollector.java
/**
 * This interface represents the context associated with all operations must be applied to all
 * partitions.
 */ 
public interface NonPartitionedContext<OUT> extends RuntimeContext {
    /**
     * Apply a function to all partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
    void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}

/** A function to be applied to all partitions . */
@FunctionalInterface
public interface ApplyPartitionFunction<OUT> {
    /**
     * The actual method to be applied to each partition.
     *
     * @param collector to output data.
     * @param ctx runtime context in which this function is executed.
     */
    void apply(Collector<OUT> collector, RuntimeContext ctx) throws Exception;
}

/**
 * This interface represents the context associated with all operations must be applied to all
 * partitions with two outputs.
 */
public interface TwoOutputNonPartitionedContext<OUT1, OUT2> extends RuntimeContext {
    /**
     * Apply a function to all partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
    void applyToAllPartitions(TwoOutputNonPartitionedContext<OUT1, OUT2> applyPartitionFunction);
}

/** A function to be applied to all partitions with two outputs.. */
@FunctionalInterface
public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> {
    /**
     * The actual method to be applied to each partition.
     *
     * @param firstOutput to emit record to first output.
     * @param secondOutput to emit record to second output.
     * @param ctx runtime context in which this function is executed.
     */
    void apply(Collector<OUT1> firstOutput, Collector<OUT2> secondOutput, RuntimeContext ctx)
            throws Exception;
}

...

Code Block
languagejava
titleOneInputStreamProcessFunction.java
/** This class contains all logical related to process records from single input. */
public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
    /**
     * Process record and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any input
     * data.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endInput(NonPartitionedContextNonPartitionedContext<OUT> ctx) {}
}

TwoInputNonBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction.java
/** This class contains all logical related to process records from two non-broadcast input. */
public interface TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * Process record from first input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromFirstInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * Process record from second input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromSecondInput(IN2 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the first input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endFirstInput(NonPartitionedContextNonPartitionedContext<OUT> ctx) {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the second input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endSecondInput(NonPartitionedContextNonPartitionedContext<OUT> ctx) {}
}

TwoInputBroadcastStreamProcessFunction

Code Block
languagejava
titleBroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction
/**
 * This class contains all logical related to process records from a broadcast stream and a
 * non-broadcast stream.
 */
public interface TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * Process record from non-broadcast input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromNonBroadcastInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * Process record from broadcast input.In general, the broadcast side is not allowed to
     * manipulate state and output data because it corresponds to all partitions instead of a single
     * partition. But you could use broadcast context to process all the partitions at once.
     *
     * @param record to process.
     * @param ctx, the context in which this function is executed.
     */
    void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the non-broadcast input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endNonBroadcastInput(NonPartitionedContextNonPartitionedContext<OUT> ctx) {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the broadcast input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endBroadcastInput(NonPartitionedContext<OUT> ctx) {}
}

...

Code Block
languagejava
titleTwoOutputStreamProcessFunction.java
/** This class contains all logical related to process and emit records to two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    /**
     * Process and emit record to the first/second output through {@link Collector}s.
     *
     * @param record to process.
     * @param output1 to emit processed records to the first output.
     * @param output2 to emit processed records to the second output.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecord(
            IN record, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx);

   /**
     * This is a life-cycle method indicates that this function will no longer receive any input
     * data.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endInput(TwoOutputNonPartitionedContextTwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {}
}

...

We can see that each process function provides the life-cycle hook for endInput. The runtime engine will call back this method after processing all data of this input, providing the final opportunity to send data to downstream. This is crucial for implementing something like full-aggregation window.

...

In general, we will expose 4 types of DataStream interfaces to users, partitioning and process can be applied to these data streams.

...

NonKeyedPartitionStream

Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, each parallelism is a
 * partition, and the partition to which the data belongs is random.
 */
public interface NonKeyedPartitionStream<T> {
    /**
     * Apply an operation to this {@link NonKeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link NonKeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply to a two input operation on this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link NonKeyedPartitionStream}. It will be used
     * as the return value of operation with two output.
     */
    interface TwoNonKeyedPartitionStreams<T1, T2> {
        /** Get the first stream. */
        NonKeyedPartitionStream<T1> getFirst();

        /** Get the second stream. */
        NonKeyedPartitionStream<T2> getSecond();
    }
}

...

KeyedPartitionStream

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, Each key group is a
 * partition, and the partition to which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> {
    /**
     * Apply an operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector} on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <OUT> KeyedPartitionStream<K, OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply an operation to this {@link KeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, these new two {@link KeySelector}s must extract the same key as the
     * original {@link KeySelector}s on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @param keySelector1 to select the key of first output.
     * @param keySelector2 to select the key of second output.
     * @return new {@link TwoKeyedPartitionStreamTwoKeyedPartitionStreams} with this operation.
     */
    <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction,
            KeySelector<OUT1, K> keySelector1,
            KeySelector<OUT2, K> keySelector2);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new {@link TwoNonKeyedPartitionStreamTwoNonKeyedPartitionStreams} with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}. The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector}s on these two {@link KeyedPartitionStream}s.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the record from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from broadcast input, the output key from keyed partition itself instead of new key
     * selector, so it is safe already.
     *
     * @param other {@link BroadcastStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

    /**
     * Transform this stream to a new {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link KeyedPartitionStream}. It will be used as
     * the return value of operation with two output.
     */
    interface TwoKeyedPartitionStreams<K, T1, T2> {
        /** Get the first stream. */
        KeyedPartitionStream<K, T1> getFirst();

        /** Get the second stream. */
        KeyedPartitionStream<K, T2> getSecond();
    }
}

...

Similarly to source, we only supports sinkV2 based sink.

The FLIP needs to move the following classes from flink-core into flink-core-api module:

Class Full Path

org.apache.flink.api.common.functions.Function

org.apache.flink.api.java.functions.KeySelector

Compatibility, Deprecation, and Migration Plan

...