Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Process Function

number of inputs

number of outputs

OneInputStreamProcessFunction

1

1

TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction

2

1

BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction

2

1

TwoOutputStreamProcessFunction

1

2

...

The case of two input is relatively special, and we have divided it into two categories:

  1. TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction: Neither of its inputs is a BroadcastStream, so processing only applied to the single partition.
  2. BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction: One of its inputs is the BroadcastStream, so the processing of this input is applied to all partitions. While the other side is Keyed/Non-Keyed Stream, it's processing applied to single partition.

...

Code Block
languagejava
titleCollector.java
/** This class take response for collecting data to output stream. */
public interface Collector<OUT> {
    /**
     * Collect record to output stream.
     *
     * @param record to be collected.
     */
    void collect(OUT record);      
    
    /**
     * Overwrite the timestamp of this record and collect it to output stream.
     *
     * @param record to be collected.
     * @param timestamp of the processed data.
     */
    void collectAndOverwriteTimestamp(OUT record, long timestamp); 
}

...

NonPartitionedContext

Sometimes it is not possible to decide on the current partition in the context of the processing. For example, when processing the input records from the broadcast edge. Therefore, we introduce a mechanism to process all partitions.

Note: RuntimeContext contains information about the context in which process functions are executed. It is currently just an empty interface but will be expanded later, such as supporting access to state, registering timers, etc. This part will be elaborated in the subsequent sub-FLIPs.

Code Block
languagejava
titleOneInputStreamProcessFunctionCollector.java
/**
 * This class contains interface represents the context associated with all logicaloperations relatedmust tobe processapplied recordsto fromall
 single* inputpartitions.
 */ 
public interface OneInputStreamProcessFunction<IN,NonPartitionedContext<OUT> OUT> extends ProcessFunctionRuntimeContext {
    /**
     * ProcessApply recorda andfunction emitto data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed recordsall partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
 @param ctx, runtime contextvoid in which thisapplyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}

/** A function is executed.
    to be applied to all partitions . */
@FunctionalInterface
public interface   void processRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws Exception;
ApplyPartitionFunction<OUT> {
    /**
     * ThisThe isactual amethod life-cycleto methodbe indicatesapplied thatto thiseach functionpartition.
 will no longer receive any input*
     * @param collector to output data.
 This allowing the ProcessFunction to* emit@param resultsctx runtime atcontext oncein ratherwhich thanthis uponfunction eachis recordexecuted.
     */
    void * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void endInput(Collector<OUT> output, RuntimeContext ctx) {}
}

TwoInputStreamProcessFunction

Code Block
languagejava
titleTwoInputStreamProcessFunction.java
/** This class contains all logical related to process records from two input. */
public interface TwoInputStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**apply(Collector<OUT> collector, RuntimeContext ctx) throws Exception;
}

/**
 * This interface represents the context associated with all operations must be applied to all
 * partitions with two outputs.
 */
public interface TwoOutputNonPartitionedContext<OUT1, OUT2> extends RuntimeContext {
    /**
     * Apply a function to all partitions. For keyed stream, it will apply to all keys. For
     * Process record from first input and emit data through {@link Collector}non-keyed stream, it will apply to single partition.
     */
    void * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.applyToAllPartitions(TwoOutputNonPartitionedContext<OUT1, OUT2> applyPartitionFunction);
}

/** A function to be applied to all partitions with two outputs.. */
@FunctionalInterface
public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> {
    /**
     */
 The actual method voidto processRecordFromFirstInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
be applied to each partition.
     *
     * @param firstOutput throws Exception;

    /**to emit record to first output.
     * Process@param record from second input and emit data through {@link Collector}.
     *
     * @paramsecondOutput to emit record to process.
     * @param output to emit processed recordssecond output.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromSecondInputapply(IN2Collector<OUT1> recordfirstOutput, Collector<OUT>Collector<OUT2> outputsecondOutput, RuntimeContext ctx)
            throws Exception;
}

OneInputStreamProcessFunction

Code Block
languagejava
titleOneInputStreamProcessFunction.java

    /**
 This class contains all *logical Thisrelated isto aprocess life-cyclerecords method indicates that this function will no longer receive any data fromfrom single input. */
public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
    /**
     * theProcess firstrecord input.and Thisemit allowingdata thethrough ProcessFunction to emit results at once rather than upon{@link Collector}.
     *
     * each@param record.
     *to process.
     * @param output to emit recordprocessed records.
     * @param ctx, runtime context in which this function is executed.
     */
    default void endFirstInputprocessRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws {}Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data frominput
     * the second inputdata.
 This allowing the ProcessFunction to emit results at once rather than upon
     * each record.
     * *
     * @param output to emit record.
     * @param ctx, runtimethe context in which this function is executed.
     */
    default void endSecondInput(Collector<OUT> output, RuntimeContextendInput(NonPartitionedContext<OUT> ctx) {}
}

...

TwoInputNonBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction.java
/**
 * This class contains all logical related to process records from a broadcast stream and a
 *two non-broadcast streaminput.
 */
public interface BroadcastTwoInputStreamProcessFunction<IN1TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * Process record from non-broadcastfirst input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromNonBroadcastInputprocessRecordFromFirstInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * Process record from broadcastsecond input.In general,and theemit broadcastdata sidethrough is not allowed to{@link Collector}.
     * manipulate state and output data because it corresponds to all partitions instead of a single
     * partition. But you could use broadcast context@param record to process all the partitions at once.
     *
 @param output to emit * @param record to processprocessed records.
     * @param ctx, theruntime context in which this function is executed.
     */
    void processRecordFromBroadcastInputprocessRecordFromSecondInput(IN2 record, Collector<OUT> output, BroadcastContext<OUT>RuntimeContext ctx)
            throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the non-broadcastfirst input. This allowing the ProcessFunction to emit results at once rather
     * than upon each record.
 
     *
     * @param output to emit record.
     * @param ctx, runtimethe context in which this function is executed.
     */
    default void endNonBroadcastInput(Collector<OUT> output, RuntimeContext endFirstInput(NonPartitionedContext<OUT> ctx) {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the broadcastsecond input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endBroadcastInputendSecondInput(BroadcastContext<OUT>NonPartitionedContext<OUT> ctx) {}
}


TwoInputBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoInputBroadcastStreamProcessFunction
/**
 * This interfaceclass representscontains theall contextlogical associatedrelated withto broadcasting. *process records from a broadcast stream and a
 * non-broadcast stream.
 */
public interface BroadcastContext<OUT> TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * ApplyProcess arecord functionfrom tonon-broadcast allinput partitions.and Foremit keyeddata stream,through it will apply to all keys. For{@link Collector}.
     *
     * non-keyed stream, it will apply to single partition@param record to process.
     */
 @param output to voidemit applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}

/** A function to be applied to all partitions . */
@FunctionalInterface
public interface ApplyPartitionFunction<OUT> {
    /**processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromNonBroadcastInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
     * The actual method to be applied to each partition.throws Exception;

     /**
     * @paramProcess collectorrecord tofrom outputbroadcast datainput.
In general, the broadcast side *is @paramnot ctxallowed runtimeto
 context in which this function is executed.
     */
    void apply(Collector<OUT> collector, RuntimeContext ctx) throws Exception;
}

TwoOutputStreamProcessFunction

Code Block
languagejava
titleTwoOutputStreamProcessFunction.java
/** This class contains all logical related * manipulate state and output data because it corresponds to all partitions instead of a single
     * partition. But you could use broadcast context to process andall emitthe recordspartitions toat two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    /**
     * Process and emit record to the first/second output through {@link Collector}sonce.
     *
     * @param record to process.
     * @param ctx, the context in which this function is executed.
     */
     * @param record to process.void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) throws Exception;

     /**
 @param output1 to emit processed* recordsThis tois thea first output.
     * @param output2 to emit processed records to the second output.life-cycle method indicates that this function will no longer receive any data from
     * @param the non-broadcast input.
     *
     * @param ctx, runtimethe context in which this function is executed.
     */
    default void processRecord(
            IN record, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx);

endNonBroadcastInput(NonPartitionedContext<OUT> ctx) {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any inputdata from
     * datathe broadcast input.
 This  allowing the ProcessFunction*
 to emit results at once* rather@param thanctx, uponthe eachcontext record.
in which this function is *executed.
     */
 @param output1 to emitdefault recordvoid endBroadcastInput(NonPartitionedContext<OUT> ctx) {}
}

TwoOutputStreamProcessFunction

Code Block
languagejava
titleTwoOutputStreamProcessFunction.java
/** This class contains all logical related to process and emit records to two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    /**to the first output.
     * @param output2 to emit record to the second output.
     * @param ctx, runtime context in which this function is executed.
     */
    default void* endInput(
Process and emit record to the first/second output through {@link Collector}s.
   Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx) {}
}

RuntimeContext contains information about the context in which process functions are executed. It is currently just an empty interface but will be expanded later, such as supporting access to state, registering timers, etc. This part will be elaborated in the subsequent sub-FLIPs.

We can see that each process function provides the life-cycle hook for endInput. The runtime engine will call back this method after processing all data of this input, providing the final opportunity to send data to downstream. This is crucial for implementing something like full-aggregation window.

DataStreams

In general, we will expose 4 types of DataStream interfaces to users, partitioning and process can be applied to these data streams.

NonKeyedPartitionedStream

  *
     * @param record to process.
     * @param output1 to emit processed records to the first output.
     * @param output2 to emit processed records to the second output.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecord(
            IN record, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx);

   /**
     * This is a life-cycle method indicates that this function will no longer receive any input
     * data.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {}
}

We can see that each process function provides the life-cycle hook for endInput. The runtime engine will call back this method after processing all data of this input, providing the final opportunity to send data to downstream. This is crucial for implementing something like full-aggregation window.

DataStreams

In general, we will expose 4 types of DataStream interfaces to users, partitioning and process can be applied to these data streams.

NonKeyedPartitionStream

Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, each parallelism is a
 * partition, and the partition to which the data belongs is random.
 */
public interface NonKeyedPartitionStream<T> {
    /**
     * Apply an operation to this {@link NonKeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link NonKeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply to a two input operation on this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();
Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, each parallelism is a
 * partition, and the partition to which the data belongs is random.
 */
public interface NonKeyedPartitionStream<T> {
    /**
     * ApplyTransform anthis operationstream to this a new {@link BroadcastStream}.
     *
     * @return the transformed {@link NonKeyedPartitionStream};BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

     /**
     * @paramSink processFunctiondata tofrom performthis operationstream.
     * @return new stream with this operation.
     */
 @param sink to <OUT>receive NonKeyedPartitionStream<OUT>data process(
from this stream.
     */
    void OneInputStreamProcessFunction<T, OUT> processFunctiontoSink(Sink<T> sink);

    /**
     * ApplyThis aclass tworepresents outputa operationcombination toof thistwo {@link NonKeyedPartitionStream}.
 It will be  *used
     * @param processFunction to perform as the return value of operation with two output operation.
     * @return new stream with this operation.
    /
    interface TwoNonKeyedPartitionStreams<T1, T2> {
        /** Get the first stream. */
     <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2>NonKeyedPartitionStream<T1> processgetFirst();

        /** Get the second  TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);
stream. */
        NonKeyedPartitionStream<T2> getSecond();
    }
}

KeyedPartitionStream

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * This   * Apply toclass represents a twokind inputof operationpartitioned on this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.data stream. For this stream, Each key group is a
 * partition, and the partition to which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> {
    /**
     * @returnApply newan streamoperation withto this operation{@link KeyedPartitionStream}.
     */
    <T_OTHER, OUT>* NonKeyedPartitionStream<OUT><p>This connectAndProcess(
method is used to avoid shuffle after applying the process function. It NonKeyedPartitionStream<T_OTHER> other,is required
     * that for the same record, the new {@link TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**KeySelector} must extract the same key as the
     * Applyoriginal a{@link twoKeySelector} inputon operation to this and other {@link BroadcastStreamKeyedPartitionStream}.
     *
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new stream{@link KeyedPartitionStream} with this operation.
     */
    <OUT> <T_OTHERKeyedPartitionStream<K, OUT> NonKeyedPartitionStream<OUT> connectAndProcessprocess(
            BroadcastStream<T_OTHER> otherOneInputStreamProcessFunction<T, OUT> processFunction,
            TwoInputStreamProcessFunction<TKeySelector<OUT, T_OTHER, OUT> processFunctionK> newKeySelector);

    /**
     * CoalesceApply thisan streamoperation to athis {@link GlobalStreamKeyedPartitionStream}.;
     *
     * @return@param theprocessFunction coalescedto globalperform streamoperation.
     */
    GlobalStream<T> global();

 @return new {@link NonKeyedPartitionStream} with this operation.
     */**
    <OUT> *NonKeyedPartitionStream<OUT> Transformprocess(
 this stream to a {@link KeyedPartitionStream}.
     *
 OneInputStreamProcessFunction<T, OUT> processFunction);

  * @param keySelector/**
 to decide how to map* dataApply toa partition.
two output operation to this * @return the transformed stream partitioned by key.{@link KeyedPartitionStream}.
     *
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /** <p>This method is used to avoid shuffle after applying the process function. It is required
     * Transformthat for thisthe streamsame torecord, athese new two {@link NonKeyedPartitionStream}, data will be shuffled betweenKeySelector}s must extract the same key as the
     * these two streams original {@link KeySelector}s on this {@link KeyedPartitionStream}.
     *
     * @return@param processFunction theto transformedperform streamtwo afteroutput shuffleoperation.
     */
 @param keySelector1 to NonKeyedPartitionStream<T> shuffle();

    /**select the key of first output.
     * Transform@param thiskeySelector2 stream to aselect newthe {@link BroadcastStream}.
     *key of second output.
     * @return the transformednew {@link BroadcastStream}TwoKeyedPartitionStreams} with this operation.
     */
    BroadcastStream<T> broadcast();

<OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
     /**
     * Sink dataTwoOutputStreamProcessFunction<T, fromOUT1, thisOUT2> stream.processFunction,
     *
     * @param sink to receive data from this stream.
KeySelector<OUT1, K> keySelector1,
        */
    void toSink(Sink<T> sinkKeySelector<OUT2, K> keySelector2);

    /**
     * ThisApply classa representstwo aoutput combinationoperation ofto twothis {@link NonKeyedPartitionStreamKeyedPartitionStream}.
 It  will be used*
     * as@param theprocessFunction return value of operation withto perform two output operation.
     */
 @return   interface TwoNonKeyedPartitionStreams<T1, T2> {
        /** Get the first stream. */new {@link TwoNonKeyedPartitionStreams} with this operation.
        NonKeyedPartitionStream<T1> getFirst();

*/
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
 /** Get the second stream. */
      TwoOutputStreamProcessFunction<T, OUT1, NonKeyedPartitionStream<T2>OUT2> getSecond(processFunction);

    }
}

KeyedPartitionedStream

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * This class represents a* kindApply ofa partitionedtwo datainput stream.operation Forto this stream, Each key group is a
 * partition, and the partition to which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> {
    /**
     * Apply an operation to this {@link KeyedPartitionStream}and other {@link KeyedPartitionStream}. The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     *
 @param processFunction to perform *operation.
 <p>This method is used to* avoid@return shufflenew after{@link applyingNonKeyedPartitionStream} thewith processthis functionoperation.
   It is required*/
    <T_OTHER, *OUT> thatNonKeyedPartitionStream<OUT> forconnectAndProcess(
 the same record, the new {@link KeySelector} must extract the same key as theKeyedPartitionStream<K, T_OTHER> other,
     *    original {@link KeySelector} on this {@link KeyedPartitionStream}.TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * @paramApply processFunctiona totwo performinput operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation to this and other {@link KeyedPartitionStream}.The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     */
     * <OUT><p>This KeyedPartitionStream<K,method OUT>is process(
used to avoid shuffle after applying the process function. It is required
 OneInputStreamProcessFunction<T, OUT> processFunction,
  * that for the same record, the new {@link KeySelector} KeySelector<OUT,must K> newKeySelector);

    /**extract the same key as the
     * Apply an operation to thisoriginal {@link KeySelector}s on these two {@link KeyedPartitionStream};s.
     *
     * @param processFunction other {@link KeyedPartitionStream} to perform operation.
     * @return new {@link NonKeyedPartitionStream} with thistwo operationinput.
     */
 @param processFunction to <OUT> NonKeyedPartitionStream<OUT> process(perform operation.
     * @param newKeySelector to select the key OneInputStreamProcessFunction<T, OUT> processFunction);
after process.
    /**
     * Apply a two output operation to this@return new {@link KeyedPartitionStream}.
 with this   *operation.
     */
 <p>This method is used to avoid shuffle after applying the process function. It is required
<T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            * that for the same record, these new two {@link KeySelector}s must extract the same key as the
KeyedPartitionStream<K, T_OTHER> other,
            TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
       * original {@link KeySelector}s on thisKeySelector<OUT, {@link KeyedPartitionStream}.K> newKeySelector);

     /**
     * @param processFunction to performApply a two outputinput operation.
 to this and other * @param keySelector1 to select the key of first output.{@link BroadcastStream}.
     *
     * @param keySelector2processFunction to select the key of second outputperform operation.
     * @return new {@linknew TwoKeyedPartitionStream}stream with this operation.
     */
    <OUT1<T_OTHER, OUT2>OUT> TwoKeyedPartitionStreams<K, OUT1, OUT2> processNonKeyedPartitionStream<OUT> connectAndProcess(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction,BroadcastStream<T_OTHER> other,
            KeySelector<OUT1TwoInputBroadcastStreamProcessFunction<T, K> keySelector1,
            KeySelector<OUT2, K> keySelector2T_OTHER, OUT> processFunction);

    /**
     * Apply a two outputinput operation to this and other {@link KeyedPartitionStreamBroadcastStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new {@link TwoNonKeyedPartitionStream} with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(<p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the record from non-broadcast TwoOutputStreamProcessFunction<Tinput, OUT1,the OUT2> processFunction);

    /**new {@link KeySelector} must extract the
     * Applysame akey twoas inputthe operationoriginal to this and other{@link KeySelector}s on the {@link KeyedPartitionStream}. The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them For the
     * record from broadcast input, the output key from keyed partition itself instead of new key
     * selector, so it is safe already.
     *
     * @param other {@link KeyedPartitionStreamBroadcastStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link NonKeyedPartitionStreamKeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT>KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, TBroadcastStream<T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

       /**
     * ApplyCoalesce athis twostream input operation to this and othera {@link KeyedPartitionStreamGlobalStream}.The
    two keyed*
     * streams@return the mustcoalesced haveglobal thestream.
 same partitions, otherwise it makes*/
 no sense to connect them.GlobalStream<T> global();

     /**
     * <p>ThisTransform methodthis is usedstream to avoida shufflenew after applying the process function. It is required{@link KeyedPartitionStream}.
     *
     * that for the same record, the new {@link KeySelector} must extract the same key as the @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
  original {@link KeySelector}s on these two {@link KeyedPartitionStream}s. <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);

     /**
     * @param other Transform this stream to a new {@link KeyedPartitionStreamNonKeyedPartitionStream}, todata performwill operationbe with two input.shuffled between
     * @paramthese processFunction to perform operationtwo streams.
     * @param newKeySelector to select the key after process.
     * @return newthe {@linktransformed KeyedPartitionStream}stream withafter this operationshuffle.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new KeyedPartitionStream<K, T_OTHER> other,{@link BroadcastStream}.
     *
     * @return TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction,the transformed {@link BroadcastStream}.
     */
       KeySelector<OUT, K> newKeySelectorBroadcastStream<T> broadcast();

    /**
     * ApplySink adata two input operation to this and other {@link BroadcastStream}from this stream.
     *
     * @param processFunctionsink to perform operation receive data from this stream.
     */
 @return new stream withvoid this operation.toSink(Sink<T> sink);

     /**/
    <T_OTHER, OUT>* NonKeyedPartitionStream<OUT>This connectAndProcess(
class represents a combination of two {@link KeyedPartitionStream}. It will be  BroadcastStream<T_OTHER> other,used as
     * the return value of operation with TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

two output.
      */**
     * Coalesce this stream to a {@link GlobalStream}.interface TwoKeyedPartitionStreams<K, T1, T2> {
     *
     /** @returnGet the coalescedfirst global stream. */
     */
   KeyedPartitionStream<K, GlobalStream<T>T1> globalgetFirst();

    /**
    /** *Get Transformthe thissecond stream to a new {@link KeyedPartitionStream}.
     */
     * @param keySelector toKeyedPartitionStream<K, decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);
T2> getSecond();
    }
}

GlobalStream

Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> {
    /**
     * TransformApply thisan streamoperation to a newthis {@link NonKeyedPartitionStreamGlobalStream},;
 data will be shuffled between*
     * these two streams@param processFunction to perform operation.
     * @return new stream with this operation.
     * @return the transformed stream after shuffle.
/
    <OUT> GlobalStream<OUT> process(
         */
   OneInputStreamProcessFunction<T, NonKeyedPartitionStream<T>OUT> shuffle(processFunction);

    /**
     * Transform this streamApply a two output operation to a newthis {@link BroadcastStreamGlobalStream}.
     *
     * @return@param theprocessFunction transformedto {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

perform two output operation.
     /**
 @return new   * Sink data fromstream with this streamoperation.
     */
    <OUT1, *OUT2> @paramTwoGlobalStream<OUT1, sinkOUT2> toprocess(
 receive data from this stream.
     */
   TwoOutputStreamProcessFunction<T, void toSink(Sink<T> sinkOUT1, OUT2> processFunction);

    /**
     * This class represents a combination of twoApply a two input operation to this and other {@link KeyedPartitionStreamGlobalStream}.
 It will be used as*
     * the return value of@param other {@link GlobalStream} to perform operation with two outputinput.
     */
 @param processFunction to interface TwoKeyedPartitionStreams<K, T1, T2> {perform operation.
        /** Get@return the firstnew stream. */
with        KeyedPartitionStream<K, T1> getFirst();

 this operation.
     */
  /** Get the second stream. */<T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
        KeyedPartitionStream<K,  T2> getSecond();
    }
}

GlobalStream

Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> { GlobalStream<T_OTHER> other,
            TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * ApplyTransform anthis operationstream to thisa {@link GlobalStreamKeyedPartitionStream};.
     *
     * @param keySelector to decide how *to @parammap processFunctiondata to perform operationpartition.
     * @return newthe transformed stream withpartitioned thisby operationkey.
     */
    <OUT> GlobalStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction<K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * ApplyTransform athis twostream outputto operationa to thisnew {@link GlobalStreamNonKeyedPartitionStream}.
, data will be shuffled *between
     * @param processFunction to performthese two output operationstreams.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoGlobalStream<OUT1,* OUT2>@return process(
the transformed stream after shuffle.
     */
   TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunctionNonKeyedPartitionStream<T> shuffle();

    /**
     * ApplyTransform athis twostream input operation to thisa andnew other {@link GlobalStreamBroadcastStream}.
     *
     * @return @paramthe othertransformed {@link GlobalStream} to perform operation with two inputBroadcastStream}.
     */
 @param processFunction to perform operation.BroadcastStream<T> broadcast();

     * @return new stream with this operation./**
     */
    <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(Sink data from this stream.
     *
     *  GlobalStream<T_OTHER> other,
    @param sink to receive data from this stream.
     */
   TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction void toSink(Sink<T> sink);

    /**
     * Transform this stream to aThis class represents a combination of two {@link KeyedPartitionStreamGlobalStream}.
 It will be used *
     * @param keySelector to decide how to map data to partition.as the
     * @returnreturn thevalue transformedof streamoperation partitionedwith bytwo keyoutput.
     */
    <K>interface KeyedPartitionStream<KTwoGlobalStream<T1, T2> {
 T> keyBy(KeySelector<T, K> keySelector);

    /**
 Get the first stream. */
 Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
GlobalStream<T1> getFirst();

        /** theseGet the twosecond streamsstream. */
     *
     * @return the transformed stream after shuffle.
      GlobalStream<T2> getSecond();
    }
}

BroadcastStream

Code Block
languagejava
titleBroadcastStream.java
/** This class represents a stream that each parallel task processes the same data.  */
public interface   NonKeyedPartitionStream<T> shuffle();
BroadcastStream<T> {
    /**
     * Apply Transforma two thisinput streamoperation to athis and newother {@link BroadcastStreamKeyedPartitionStream}.
     *
     * @return@param the transformedother {@link BroadcastStream}KeyedPartitionStream} to perform operation with two input.
     */
 @param processFunction to BroadcastStream<T> broadcast();

perform operation.
     /*** @return new stream with this operation.
     * Sink data from this stream./
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
     *
     * @param sink to receive data from this stream.
KeyedPartitionStream<K, T_OTHER> other,
         */
    void toSink(Sink<T> sinkTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * Apply a Thistwo classinput representsoperation ato combinationthis ofand twoother {@link GlobalStreamNonKeyedPartitionStream}. It will be used as the
     * return value of operation with two output.
     */
 @param other {@link interfaceNonKeyedPartitionStream} TwoGlobalStream<T1,to T2>perform {
operation with two input.
     /** @param GetprocessFunction theto firstperform streamoperation.
  */
   * @return new stream with GlobalStream<T1> getFirst();

this operation.
     */
   /** Get the second stream. */ <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
        GlobalStream<T2> getSecond();
   NonKeyedPartitionStream<T_OTHER> }
}

BroadcastStream

Code Block
languagejava
titleBroadcastStream.java
/** This class represents a stream that each parallel task processes the same data. */
public interface BroadcastStream<T> {other,
            TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * @param<p>This othermethod {@linkis KeyedPartitionStream}used to performavoid operationshuffle withafter twoapplying input.
the process function. It  * @param processFunction to perform operation.is required
     * @returnthat newfor streamthe withrecord this operation.
     */
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original  KeyedPartitionStream<K, T_OTHER> other,
    {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from TwoInputStreamProcessFunction<T, T_OTHERbroadcast input, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link NonKeyedPartitionStream}the output key from keyed partition itself instead of new key
     * selector, so it is safe already.
     *
     * @param other {@link NonKeyedPartitionStreamKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link streamKeyedPartitionStream} with this operation.
     */
    <K, <TT_OTHER, OUT> NonKeyedPartitionStream<OUT>KeyedPartitionStream<K, OUT> connectAndProcess(
            NonKeyedPartitionStream<TKeyedPartitionStream<K, T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector); 
}

Similarly to source, we only supports sinkV2 based sink.

The FLIP needs to move the following classes from flink-core into flink-core-api module:

Class Full Path

org.apache.flink.api.common.functions.Function

org.apache.flink.api.java.functions.KeySelector

Compatibility, Deprecation, and Migration Plan

...