Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Process Function

number of inputs

number of outputs

OneInputStreamProcessFunction

1

1

TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction

2

1

BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction

2

1

TwoOutputStreamProcessFunction

1

2

...

The case of two input is relatively special, and we have divided it into two categories:

  1. TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction: Neither of its inputs is a BroadcastStream, so processing only applied to the single partition.
  2. BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction: One of its inputs is the BroadcastStream, so the processing of this input is applied to all partitions. While the other side is Keyed/Non-Keyed Stream, it's processing applied to single partition.

...

Code Block
languagejava
titleCollector.java
/** This class take response for collecting data to output stream. */
public interface Collector<OUT> {
    /**
     * Collect record to output stream.
     *
     * @param record to be collected.
     */
    void collect(OUT record);      
    
    /**
     * Overwrite the timestamp of this record and collect it to output stream.
     *
     * @param record to be collected.
     * @param timestamp of the processed data.
     */
    void collectAndOverwriteTimestamp(OUT record, long timestamp); 
}

...

NonPartitionedContext

Sometimes it is not possible to decide on the current partition in the context of the processing. For example, when processing the input records from the broadcast edge. Therefore, we introduce a mechanism to process all partitions.

Note: RuntimeContext contains information about the context in which process functions are executed. It is currently just an empty interface but will be expanded later, such as supporting access to state, registering timers, etc. This part will be elaborated in the subsequent sub-FLIPs.

Code Block
languagejava
titleOneInputStreamProcessFunctionCollector.java
/**
 * This class containsinterface represents the context associated with all logicaloperations relatedmust tobe processapplied recordsto fromall
 single* inputpartitions.
 */ 
public interface OneInputStreamProcessFunction<IN, OUT>NonPartitionedContext<OUT> extends ProcessFunctionRuntimeContext {
    /**
     * ProcessApply recorda andfunction emitto data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed recordsall partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
 @param ctx, runtime contextvoid in which this function is executed.
    applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}

/** A function to be applied to all partitions . */
@FunctionalInterface
public interface   void processRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws Exception;

ApplyPartitionFunction<OUT> {
    /**
     * ThisThe isactual amethod life-cycleto methodbe indicatesapplied thatto thiseach functionpartition.
 will no longer receive any input*
     * data.
@param collector to output  *data.
     * @param ctx, theruntime context in which this function is executed.
     */
    default void endInput(NonPartitionedContextapply(Collector<OUT> collector, RuntimeContext ctx) {}
}

TwoInputStreamProcessFunction

Code Block
languagejava
titleTwoInputStreamProcessFunction.java
/** This class contains all logical related to process records from two input.throws Exception;
}

/**
 * This interface represents the context associated with all operations must be applied to all
 * partitions with two outputs.
 */
public interface TwoInputStreamProcessFunction<IN1TwoOutputNonPartitionedContext<OUT1, IN2, OUT>OUT2> extends ProcessFunctionRuntimeContext {
    /**
     * ProcessApply recorda fromfunction firstto inputall andpartitions. emitFor datakeyed throughstream, {@link Collector}.
     *it will apply to all keys. For
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromFirstInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**non-keyed stream, it will apply to single partition.
     */
    void applyToAllPartitions(TwoOutputNonPartitionedContext<OUT1, OUT2> applyPartitionFunction);
}

/** A function to be applied to all partitions with two outputs.. */
@FunctionalInterface
public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> {
    /**
     * The actual method to be applied to each partition.
     *
     * Process@param record from second input andfirstOutput to emit datarecord throughto {@linkfirst Collector}output.
     *
 @param secondOutput to  * @paramemit record to process.
     * @param output to emit processed recordssecond output.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromSecondInputapply(IN2Collector<OUT1> recordfirstOutput, Collector<OUT>Collector<OUT2> outputsecondOutput, RuntimeContext ctx)
            throws Exception;
}

OneInputStreamProcessFunction

Code Block
languagejava
titleOneInputStreamProcessFunction.java

    /**
 This class contains all *logical Thisrelated isto aprocess life-cyclerecords methodfrom indicatessingle that this function will no longer receive any data frominput. */
public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
    /**
     * Process record and emit data through {@link Collector}.
     *
     * the@param record firstto inputprocess.
     * @param output to emit processed records.
     * @param ctx, theruntime context in which this function is executed.
     */
    default void endFirstInput(NonPartitionedContextprocessRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws {}Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data frominput
     * the second inputdata.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endSecondInputendInput(NonPartitionedContextNonPartitionedContext<OUT> ctx) {}
}

...

TwoInputNonBroadcastStreamProcessFunction

Code Block
languagejava
titleBroadcastTwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction.java
/**
 * This class contains all logical related to process records from a broadcast stream and a
 *two non-broadcast streaminput.
 */
public interface BroadcastTwoInputStreamProcessFunction<IN1TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * Process record from non-broadcastfirst input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromNonBroadcastInputprocessRecordFromFirstInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * Process record from broadcastsecond input.In general,and theemit broadcastdata sidethrough is not allowed to{@link Collector}.
     *
 manipulate state and output data* because@param it correspondsrecord to allprocess.
 partitions instead of a single
* @param output to emit *processed partitionrecords.
 But you could use broadcast* context@param toctx, processruntime allcontext thein partitionswhich at oncethis function is executed.
     */
     * @paramvoid processRecordFromSecondInput(IN2 record, to process.
     * @param ctx, the context in which this function is executed.Collector<OUT> output, RuntimeContext ctx)
     */
    void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the non-broadcastfirst input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endNonBroadcastInputendFirstInput(NonPartitionedContextNonPartitionedContext<OUT> ctx) {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the broadcastsecond input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endBroadcastInputendSecondInput(NonPartitionedContext<OUT> ctx) {}
}

TwoInputBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoInputBroadcastStreamProcessFunction
   

/**
 * This interfaceclass representscontains theall contextlogical associatedrelated withto allprocess operationsrecords mustfrom a bebroadcast appliedstream toand alla
 * non-broadcast partitionsstream.
 */ 
public interface NonPartitionedContext<OUT> TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * ApplyProcess arecord functionfrom tonon-broadcast allinput partitions.and Foremit keyeddata stream,through it will apply to all keys. For{@link Collector}.
     *
     * non-keyed stream, it will apply to single partition@param record to process.
     */
 @param output to voidemit applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}

/** A function to be applied to all partitions . */
@FunctionalInterface
public interface ApplyPartitionFunction<OUT> {
processed records.
     * @param ctx, runtime context in which this function is executed.
     */**
    void * The actual method to be applied to each partition.
processRecordFromNonBroadcastInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
         *
   throws Exception;

 * @param collector to output data. /**
     * Process @paramrecord ctxfrom runtimebroadcast context in which this function is executed.input.In general, the broadcast side is not allowed to
     */
 manipulate state  void apply(Collector<OUT> collector, RuntimeContext ctx) throws Exception;
}

TwoOutputStreamProcessFunction

Code Block
languagejava
titleTwoOutputStreamProcessFunction.java
/** This class contains all logical related to process and emit records to two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    /**and output data because it corresponds to all partitions instead of a single
     * partition. But you could use broadcast context to process all the partitions at once.
     *
     * @param record to process.
     * Process@param andctx, emitthe recordcontext toin thewhich first/secondthis outputfunction through {@link Collector}sis executed.
     */
     * @param record to process.void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) throws Exception;

     /**
 @param output1 to emit processed* recordsThis tois thea first output.
     * @param output2 to emit processed records to the second output.life-cycle method indicates that this function will no longer receive any data from
     * @param ctx, runtime context in which this function is executed.the non-broadcast input.
     *
     */
 @param ctx, the voidcontext processRecord(
in which this   function is executed.
     IN*/
 record, Collector<OUT1> output1, Collector<OUT2>default output2, RuntimeContextvoid endNonBroadcastInput(NonPartitionedContext<OUT> ctx); {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data inputfrom
     * the broadcast datainput.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endInputendBroadcastInput(TwoOutputNonPartitionedContextNonPartitionedContext<OUT> ctx) {}
}

TwoOutputStreamProcessFunction

Code Block
languagejava
titleTwoOutputStreamProcessFunction.java
/** This class contains all logical related to process and emit records to two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    

/**
     * Process and Thisemit interfacerecord representsto the contextfirst/second associatedoutput withthrough all operations must be applied to all
{@link Collector}s.
     *
     * partitions@param withrecord twoto outputsprocess.
 */
public interface TwoOutputNonPartitionedContext<OUT1, OUT2> {
* @param output1 to /**
emit processed records to the *first Applyoutput.
 a function to all partitions.* For@param keyedoutput2 stream,to itemit willprocessed applyrecords to allthe second keysoutput. For
     * non-keyed@param streamctx, runtime context itin willwhich applythis tofunction singleis partitionexecuted.
     */
    void applyToAllPartitionsprocessRecord(TwoOutputNonPartitionedContext<OUT1,
  OUT2> applyPartitionFunction);
}

/** A function to be applied to all partitions withIN two outputs.. */
@FunctionalInterface
public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> {record, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx);

    /**
     * TheThis actualis methoda tolife-cycle bemethod appliedindicates to each partition.
     *
     * @param firstOutput to emit record to first output.that this function will no longer receive any input
     * @param secondOutput to emit record to second output.data.
     *
     * @param ctx, runtimethe context in which this function is executed.
     */
    default void apply(Collector<OUT1> firstOutput, Collector<OUT2> secondOutput, RuntimeContext ctx)
            throws Exception;
}

...

endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {}
}

We can see that each process function provides the life-cycle hook for endInput. The runtime engine will call back this method after processing all data of this input, providing the final opportunity to send data to downstream. This is crucial for implementing something like full-aggregation window.

...

In general, we will expose 4 types of DataStream interfaces to users, partitioning and process can be applied to these data streams.

...

NonKeyedPartitionStream

Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, each parallelism is a
 * partition, and the partition to which the data belongs is random.
 */
public interface NonKeyedPartitionStream<T> {
    /**
     * Apply an operation to this {@link NonKeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link NonKeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply to a two input operation on this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link NonKeyedPartitionStream}. It will be used
     * as the return value of operation with two output.
     */
    interface TwoNonKeyedPartitionStreams<T1, T2> {
        /** Get the first stream. */
        NonKeyedPartitionStream<T1> getFirst();

        /** Get the second stream. */
        NonKeyedPartitionStream<T2> getSecond();
    }
}

...

KeyedPartitionStream

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, Each key group is a
 * partition, and the partition to which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> {
    /**
     * Apply an operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector} on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <OUT> KeyedPartitionStream<K, OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply an operation to this {@link KeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, these new two {@link KeySelector}s must extract the same key as the
     * original {@link KeySelector}s on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @param keySelector1 to select the key of first output.
     * @param keySelector2 to select the key of second output.
     * @return new {@link TwoKeyedPartitionStreamTwoKeyedPartitionStreams} with this operation.
     */
    <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction,
            KeySelector<OUT1, K> keySelector1,
            KeySelector<OUT2, K> keySelector2);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new {@link TwoNonKeyedPartitionStreamTwoNonKeyedPartitionStreams} with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}. The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector}s on these two {@link KeyedPartitionStream}s.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the record from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from broadcast input, the output key from keyed partition itself instead of new key
     * selector, so it is safe already.
     *
     * @param other {@link BroadcastStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

    /**
     * Transform this stream to a new {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link KeyedPartitionStream}. It will be used as
     * the return value of operation with two output.
     */
    interface TwoKeyedPartitionStreams<K, T1, T2> {
        /** Get the first stream. */
        KeyedPartitionStream<K, T1> getFirst();

        /** Get the second stream. */
        KeyedPartitionStream<K, T2> getSecond();
    }
}

...

Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> {
    /**
     * Apply an operation to this {@link GlobalStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> GlobalStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link GlobalStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoGlobalStream<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link GlobalStream}.
     *
     * @param other {@link GlobalStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
            GlobalStream<T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link GlobalStream}. It will be used as the
     * return value of operation with two output.
     */
    interface TwoGlobalStream<T1, T2> {
        /** Get the first stream. */
        GlobalStream<T1> getFirst();

        /** Get the second stream. */
        GlobalStream<T2> getSecond();
    }
}

...

Code Block
languagejava
titleBroadcastStream.java
/** This class represents a stream that each parallel task processes the same data. */
public interface BroadcastStream<T> {
    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

   /**
     * Apply a two input operation to this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

   /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the record from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from broadcast input, the output key from keyed partition itself instead of new key
     * selector, so it is safe already.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <K, T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector); 
}

Similarly to source, we only supports sinkV2 based sink.

The FLIP needs to move the following classes from flink-core into flink-core-api module:

Class Full Path

org.apache.flink.api.common.functions.Function

org.apache.flink.api.java.functions.KeySelector

Compatibility, Deprecation, and Migration Plan

...