Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Process Function

number of inputs

number of outputs

OneInputStreamProcessFunction

1

1

TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction

2

1

BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction

2

1

TwoOutputStreamProcessFunction

1

2

...

The case of two input is relatively special, and we have divided it into two categories:

  1. TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction: Neither of its inputs is a BroadcastStream, so processing only applied to the single partition.
  2. BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction: One of its inputs is the BroadcastStream, so the processing of this input is applied to all partitions. While the other side is Keyed/Non-Keyed Stream, it's processing applied to single partition.

...

Code Block
languagejava
titleCollector.java
/** This class take response for collecting data to output stream. */
public interface Collector<OUT> {
    /**
     * Collect record to output stream.
     *
     * @param record to be collected.
     */
    void collect(OUT record);      
    
    /**
     * Overwrite the timestamp of this record and collect it to output stream.
     *
     * @param record to be collected.
     * @param timestamp of the processed data.
     */
    void collectAndOverwriteTimestamp(OUT record, long timestamp); 
}

NonPartitionedContext

Sometimes it is not possible to decide on the current partition in the context of the processing. For example, when processing the input records from the broadcast edge. Therefore, we introduce a mechanism to process all partitions.

Note: RuntimeContext contains information about the context in which process functions are executed. It is currently just an empty interface but will be expanded later, such as supporting access to state, registering timers, etc. This part will be elaborated in the subsequent sub-FLIPs.

Code Block
languagejava
titleCollector.java
/**
 * This interface represents the context associated with all operations must be applied to all
 * partitions.
 */ 
public interface NonPartitionedContext<OUT> extends RuntimeContext {
    /**
     * Apply a function to all partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
    void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}

/** A function to be applied to all partitions . */
@FunctionalInterface
public interface ApplyPartitionFunction<OUT> {
    /**
     * The actual method to be applied to each partition.
     *
     * @param collector to output data.
     * @param ctx runtime context in which this function is executed.
     */
    void apply(Collector<OUT> collector, RuntimeContext ctx) throws Exception;
}

OneInputStreamProcessFunction

Code Block
languagejava
titleOneInputStreamProcessFunction.java


/**
 * This classinterface containsrepresents allthe logicalcontext relatedassociated towith process records from single input.all operations must be applied to all
 * partitions with two outputs.
 */
public interface OneInputStreamProcessFunction<INTwoOutputNonPartitionedContext<OUT1, OUT>OUT2> extends ProcessFunctionRuntimeContext {
    /**
     * ProcessApply recorda andfunction emitto data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed recordsall partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
 @param ctx, runtime context in which this function is executed.
     */
    void processRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws Exception;

void applyToAllPartitions(TwoOutputNonPartitionedContext<OUT1, OUT2> applyPartitionFunction);
}

/** A function to be applied to all partitions with two outputs.. */
@FunctionalInterface
public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> {
    /**
     * ThisThe isactual amethod life-cycleto methodbe indicatesapplied thatto thiseach functionpartition.
 will no longer receive any input*
     * data.
     * @param firstOutput to emit record to first output.
     * @param ctx,secondOutput theto contextemit inrecord which to second output.
     * @param ctx runtime context in which this function is executed.
     */
    default void endInput(NonPartitionedContextapply(Collector<OUT1> firstOutput, Collector<OUT2> secondOutput, RuntimeContext ctx) {}
            throws Exception;
}

...

OneInputStreamProcessFunction

Code Block
languagejava
titleTwoInputStreamProcessFunctionOneInputStreamProcessFunction.java
/** This class contains all logical related to process records from twosingle input. */
public interface TwoInputStreamProcessFunction<IN1OneInputStreamProcessFunction<IN, IN2, OUT> extends ProcessFunction {
    /**
     * Process record from first input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromFirstInputprocessRecord(IN1IN record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * ProcessThis recordis froma secondlife-cycle inputmethod andindicates emitthat datathis throughfunction {@link Collector}.
     *will no longer receive any input
     * @param record to processdata.
     * @param output to emit processed records.
     * @param ctx, runtimethe context in which this function is executed.
     */
    default void processRecordFromSecondInput(IN2 record, Collector<OUT> output, RuntimeContextendInput(NonPartitionedContext<OUT> ctx) {}
}

TwoInputNonBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoInputNonBroadcastStreamProcessFunction.java
/** This class contains all logical related to process records from two throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data fromnon-broadcast input. */
public interface TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * Process record from first input and emit data through {@link Collector}.
     *
     * @param therecord firstto inputprocess.
     * @param output to emit processed records.
     * @param ctx, theruntime context in which this function is executed.
     */
    default void endFirstInput(NonPartitionedContextprocessRecordFromFirstInput(IN1 record, Collector<OUT> output, RuntimeContext ctx) {}
            throws Exception;

    /**
     * ThisProcess isrecord afrom life-cyclesecond methodinput indicatesand thatemit thisdata functionthrough will no longer receive any data from{@link Collector}.
     *
     * the@param record secondto inputprocess.
     * @param output to emit processed records.
     * @param ctx, theruntime context in which this function is executed.
     */
    default void endSecondInput(NonPartitionedContextprocessRecordFromSecondInput(IN2 record, Collector<OUT> output, RuntimeContext ctx) {}
}

BroadcastTwoInputStreamProcessFunction

Code Block
languagejava
titleBroadcastTwoInputStreamProcessFunction
/**
 * This class contains all logical related to process records from a broadcast stream and a
 * non-broadcast stream.
 */
public interface BroadcastTwoInputStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
   throws Exception;

    /**
     * ProcessThis recordis froma nonlife-broadcastcycle inputmethod andindicates emitthat datathis throughfunction {@link Collector}.
     *will no longer receive any data from
     * @paramthe recordfirst to processinput.
     * @param output to emit processed records.
     * @param ctx, runtimethe context in which this function is executed.
     */
    default void processRecordFromNonBroadcastInputendFirstInput(IN1NonPartitionedContext<OUT> record, Collector<OUT> output, RuntimeContext ctx)ctx) {}

    /**
     * This is a life-cycle method indicates throws Exception;

    /**
     * Process record from broadcast input.In general, the broadcast side is not allowed tothat this function will no longer receive any data from
     * the second input.
     *
 manipulate state and output data* because@param itctx, correspondsthe tocontext allin partitionswhich insteadthis offunction ais singleexecuted.
     */
 partition. But you coulddefault usevoid endSecondInput(NonPartitionedContext<OUT> ctx) {}
}

TwoInputBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoInputBroadcastStreamProcessFunction
/**
 * This class contains all logical related to process records from a broadcast stream and a
 * non-broadcast stream.
 */
public interface TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
broadcast context to process all the partitions at once.
     *
     * @param record to process.
     * @param ctx, the context in which this function is executed.
     */
    void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) throws Exception;

    /**
     * ThisProcess isrecord afrom lifenon-cyclebroadcast methodinput indicatesand thatemit thisdata functionthrough will no longer receive any data from{@link Collector}.
     *
     * @param therecord non-broadcastto inputprocess.
     * @param output to emit processed records.
     * @param ctx, theruntime context in which this function is executed.
     */
    default void endNonBroadcastInputprocessRecordFromNonBroadcastInput(NonPartitionedContextIN1 ctx) {}

    /**record, Collector<OUT> output, RuntimeContext ctx)
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the broadcast input.
     *throws Exception;

    /**
     * Process record from broadcast input.In general, the broadcast side is not allowed to
     * @parammanipulate ctx,state theand contextoutput indata whichbecause thisit functioncorresponds isto executed.
all partitions instead of a */single
    default void endBroadcastInput(NonPartitionedContext<OUT> ctx) {}
}

TwoOutputStreamProcessFunction

Code Block
languagejava
titleTwoOutputStreamProcessFunction.java
/** This class contains all logical related* partition. But you could use broadcast context to process and emit records to two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    /** all the partitions at once.
     *
     * @param record to process.
     * Process@param andctx, emitthe recordcontext toin thewhich first/secondthis outputfunction through {@link Collector}sis executed.
     */
     * @param record to process.void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) throws Exception;

     /**
 @param output1 to emit processed* recordsThis tois thea first output.
     * @param output2 to emit processed records to the second output.life-cycle method indicates that this function will no longer receive any data from
     * @param ctx, runtime context in which this function is executed.the non-broadcast input.
     *
     */
 @param ctx, the voidcontext processRecord(
in which this   function is executed.
     IN*/
 record, Collector<OUT1> output1, Collector<OUT2>default output2, RuntimeContextvoid endNonBroadcastInput(NonPartitionedContext<OUT> ctx); {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data inputfrom
     * the broadcast datainput.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endInputendBroadcastInput(TwoOutputNonPartitionedContextNonPartitionedContext<OUT> ctx) {}
}

TwoOutputStreamProcessFunction

Code Block
languagejava
titleTwoOutputStreamProcessFunction.java
/** This class contains all logical related to process and emit records to two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    

/**
     * Process and Thisemit interfacerecord representsto the contextfirst/second associatedoutput withthrough all operations must be applied to all
{@link Collector}s.
     *
     * partitions@param withrecord twoto outputsprocess.
 */
public interface TwoOutputNonPartitionedContext<OUT1, OUT2> {
* @param output1 to /**
emit processed records to the *first Applyoutput.
 a function to all partitions.* For@param keyedoutput2 stream,to itemit willprocessed applyrecords to allthe second keysoutput. For
     * non-keyed@param streamctx, runtime context itin willwhich applythis tofunction singleis partitionexecuted.
     */
    void applyToAllPartitionsprocessRecord(TwoOutputNonPartitionedContext<OUT1,
  OUT2> applyPartitionFunction);
}

/** A function to be applied to all partitions withIN two outputs.. */
@FunctionalInterface
public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> {record, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx);

    /**
     * TheThis actualis methoda tolife-cycle bemethod appliedindicates to each partition.
     *
     * @param firstOutput to emit record to first output.that this function will no longer receive any input
     * @param secondOutput to emit record to second output.data.
     *
     * @param ctx, runtimethe context in which this function is executed.
     */
    default void apply(Collector<OUT1> firstOutput, Collector<OUT2> secondOutput, RuntimeContext ctx)
            throws Exception;
}

...

endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {}
}

We can see that each process function provides the life-cycle hook for endInput. The runtime engine will call back this method after processing all data of this input, providing the final opportunity to send data to downstream. This is crucial for implementing something like full-aggregation window.

...

In general, we will expose 4 types of DataStream interfaces to users, partitioning and process can be applied to these data streams.

...

NonKeyedPartitionStream

Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, each parallelism is a
 * partition, and the partition to which the data belongs is random.
 */
public interface NonKeyedPartitionStream<T> {
    /**
     * Apply an operation to this {@link NonKeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link NonKeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply to a two input operation on this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link NonKeyedPartitionStream}. It will be used
     * as the return value of operation with two output.
     */
    interface TwoNonKeyedPartitionStreams<T1, T2> {
        /** Get the first stream. */
        NonKeyedPartitionStream<T1> getFirst();

        /** Get the second stream. */
        NonKeyedPartitionStream<T2> getSecond();
    }
}

...

KeyedPartitionStream

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, Each key group is a
 * partition, and the partition to which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> {
    /**
     * Apply an operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector} on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <OUT> KeyedPartitionStream<K, OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply an operation to this {@link KeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, these new two {@link KeySelector}s must extract the same key as the
     * original {@link KeySelector}s on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @param keySelector1 to select the key of first output.
     * @param keySelector2 to select the key of second output.
     * @return new {@link TwoKeyedPartitionStreamTwoKeyedPartitionStreams} with this operation.
     */
    <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction,
            KeySelector<OUT1, K> keySelector1,
            KeySelector<OUT2, K> keySelector2);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new {@link TwoNonKeyedPartitionStreamTwoNonKeyedPartitionStreams} with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}. The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector}s on these two {@link KeyedPartitionStream}s.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the record from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from broadcast input, the output key from keyed partition itself instead of new key
     * selector, so it is safe already.
     *
     * @param other {@link BroadcastStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

    /**
     * Transform this stream to a new {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link KeyedPartitionStream}. It will be used as
     * the return value of operation with two output.
     */
    interface TwoKeyedPartitionStreams<K, T1, T2> {
        /** Get the first stream. */
        KeyedPartitionStream<K, T1> getFirst();

        /** Get the second stream. */
        KeyedPartitionStream<K, T2> getSecond();
    }
}

...

Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> {
    /**
     * Apply an operation to this {@link GlobalStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> GlobalStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link GlobalStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoGlobalStream<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link GlobalStream}.
     *
     * @param other {@link GlobalStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
            GlobalStream<T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link GlobalStream}. It will be used as the
     * return value of operation with two output.
     */
    interface TwoGlobalStream<T1, T2> {
        /** Get the first stream. */
        GlobalStream<T1> getFirst();

        /** Get the second stream. */
        GlobalStream<T2> getSecond();
    }
}

...

Code Block
languagejava
titleBroadcastStream.java
/** This class represents a stream that each parallel task processes the same data. */
public interface BroadcastStream<T> {
    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

   /**
     * Apply a two input operation to this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

   /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the record from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from broadcast input, the output key from keyed partition itself instead of new key
     * selector, so it is safe already.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <K, T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector); 
}

Similarly to source, we only supports sinkV2 based sink.

The FLIP needs to move the following classes from flink-core into flink-core-api module:

Class Full Path

org.apache.flink.api.common.functions.Function

org.apache.flink.api.java.functions.KeySelector

Compatibility, Deprecation, and Migration Plan

...