Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Process Function

number of inputs

number of outputs

OneInputStreamProcessFunction

1

1

TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction

2

1

BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction

2

1

TwoOutputStreamProcessFunction

1

2

Logically, process functions that support more inputs and outputs can be achieved by combining them, but this implementation might be inefficient. If the call for this becomes louder, we will consider supporting as many output edges as we want through a mechanism like OutputTag. But this loses the explicit generic type information that comes with using ProcessFunction.

The case of two input is relatively special, and we have divided it into two categories:

  1. TwoInputNonBroadcastStreamProcessFunction: Neither of its inputs is a BroadcastStream, so processing only applied to the single partition.
  2. TwoInputBroadcastStreamProcessFunction: One of its inputs is the BroadcastStream, so the processing of this input is applied to all partitions. While the other side is Keyed/Non-Keyed Stream, it's processing applied to single partition.

Advantages of ProcessFunction

...

Output

Input2

Global

Keyed

NonKeyed

Broadcast

Input1

Global

Global

Keyed

Non-Keyed / Keyed

Non-Keyed / Keyed

NonKeyed

Non-Keyed

Non-Keyed

Broadcast

Non-Keyed / Keyed

Non-Keyed

...

Code Block
languagejava
titleCollector.java
/** This class take response for collecting data to output stream. */
public interface Collector<OUT> {
    /**
     * Collect record to output stream.
     *
     * @param record to be collected.
     */
    void collect(OUT record);      
    
    /**
     * Overwrite the timestamp of this record and collect it to output stream.
     *
     * @param record to be collected.
     * @param timestamp of the processed data.
     */
    void collectAndOverwriteTimestamp(OUT record, long timestamp); 
}

OneInputStreamProcessFunction

NonPartitionedContext

Sometimes it is not possible to decide on the current partition in the context of the processing. For example, when processing the input records from the broadcast edge. Therefore, we introduce a mechanism to process all partitions.

Note: RuntimeContext contains information about the context in which process functions are executed. It is currently just an empty interface but will be expanded later, such as supporting access to state, registering timers, etc. This part will be elaborated in the subsequent sub-FLIPs.

Code Block
languagejava
titleCollector
Code Block
languagejava
titleOneInputStreamProcessFunction.java
/**
 * This classinterface containsrepresents all logical related to process records from single input. */the context associated with all operations must be applied to all
 * partitions.
 */ 
public interface OneInputStreamProcessFunction<IN,NonPartitionedContext<OUT> OUT> extends ProcessFunctionRuntimeContext {
    /**
     * ProcessApply recorda andfunction emitto data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed recordsall partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
 @param ctx, runtime contextvoid in which thisapplyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}

/** A function isto executed.
be applied to all partitions . */
@FunctionalInterface
public interface   void processRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws Exception;
ApplyPartitionFunction<OUT> {
    /**
     * ThisThe isactual amethod life-cycleto methodbe indicatesapplied thatto thiseach functionpartition.
 will no longer receive any input*
     * @param collector to output data.
 This allowing the ProcessFunction to* emit@param resultsctx atruntime context oncein ratherwhich thanthis uponfunction eachis recordexecuted.
     */
    void * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void endInput(Collector<OUT> output, RuntimeContext ctx) {}
}

TwoInputStreamProcessFunction

Code Block
languagejava
titleTwoInputStreamProcessFunction.java
/** This class contains all logical related to process records from two input. */
public interface TwoInputStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**apply(Collector<OUT> collector, RuntimeContext ctx) throws Exception;
}

/**
 * This interface represents the context associated with all operations must be applied to all
 * partitions with two outputs.
 */
public interface TwoOutputNonPartitionedContext<OUT1, OUT2> extends RuntimeContext {
    /**
     * Apply a function to all partitions. For keyed stream, it will apply to all keys. For
     * Process record from first input and emit data through {@link Collector}non-keyed stream, it will apply to single partition.
     */
    void * @param record to process.
     * @param output to emit processed records.applyToAllPartitions(TwoOutputNonPartitionedContext<OUT1, OUT2> applyPartitionFunction);
}

/** A function to be applied to all partitions with two outputs.. */
@FunctionalInterface
public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> {
    /**
     * @paramThe ctx,actual runtimemethod contextto inbe whichapplied thisto functioneach is executedpartition.
     */
    void processRecordFromFirstInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
   * @param firstOutput to emit record to first output.
     * @param secondOutput to emit throws Exception;

    /**
     * Process record from second input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed recordsrecord to second output.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromSecondInputapply(IN2Collector<OUT1> recordfirstOutput, Collector<OUT>Collector<OUT2> outputsecondOutput, RuntimeContext ctx)
            throws Exception;

    /**
    }

OneInputStreamProcessFunction

Code Block
languagejava
titleOneInputStreamProcessFunction.java
/** This class contains all logical related to process records from single input. */
public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
    /** * This is a life-cycle method indicates that this function will no longer receive any data from
     * theProcess firstrecord input.and Thisemit allowingdata thethrough ProcessFunction to emit results at once rather than upon{@link Collector}.
     *
     * each@param record.
     *to process.
     * @param output to emit processed recordrecords.
     * @param ctx, runtime context in which this function is executed.
     */
    default void endFirstInputprocessRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws {}Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the second input. This allowing the ProcessFunction to emit results at once rather than upon
     * each recorddata.
     *
     * @param output to emit record.
     * @param ctx, runtimethe context in which this function is executed.
     */
    default void endSecondInputendInput(Collector<OUT>NonPartitionedContext<OUT> output, RuntimeContext ctx) {}
}

...

TwoInputNonBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoOutputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction.java
/** This class contains all logical related to process and emit records tofrom two outputsnon-broadcast input. */
public interface TwoOutputStreamProcessFunction<INTwoInputNonBroadcastStreamProcessFunction<IN1, OUT1IN2, OUT2>OUT> extends ProcessFunction {
    /**
     * Process andrecord emitfrom recordfirst toinput theand first/secondemit outputdata through {@link Collector}s.
     *
     * @param record to process.
     * @param output1 to emit processed records to the first output.
     * @param output2 to emit processed records to the second output.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordprocessRecordFromFirstInput(
IN1 record, Collector<OUT> output, RuntimeContext ctx)
       IN record, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx)throws Exception;

    /**
     * ThisProcess isrecord afrom life-cyclesecond methodinput indicatesand thatemit thisdata functionthrough will no longer receive any input
     * data. This allowing the ProcessFunction to emit results at once rather than upon each record.{@link Collector}.
     *
     * @param output1 to emit record to the first outputprocess.
     * @param output2output to emit record to the second outputprocessed records.
     * @param ctx, runtime context in which this function is executed.
     */
    default void endInputprocessRecordFromSecondInput(
IN2 record, Collector<OUT> output, RuntimeContext ctx)
       Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx) {}
}

RuntimeContext contains information about the context in which process functions are executed. It is currently just an empty interface but will be expanded later, such as supporting access to state, registering timers, etc. This part will be elaborated in the subsequent sub-FLIPs.

We can see that each process function provides the life-cycle hook for endInput. The runtime engine will call back this method after processing all data of this input, providing the final opportunity to send data to downstream. This is crucial for implementing something like full-aggregation window.

DataStreams

In general, we will expose 4 types of DataStream interfaces to users, partitioning and process can be applied to these data streams.

NonKeyedPartitionedStream

throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the first input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endFirstInput(NonPartitionedContext<OUT> ctx) {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the second input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endSecondInput(NonPartitionedContext<OUT> ctx) {}
}

TwoInputBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoInputBroadcastStreamProcessFunction
/**
 * This class contains all logical related to process records from a broadcast stream and a
 * non-broadcast stream.
 */
public interface TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * Process record from non-broadcast input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromNonBroadcastInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * Process record from broadcast input.In general, the broadcast side is not allowed to
     * manipulate state and output data because it corresponds to all partitions instead of a single
     * partition. But you could use broadcast context to process all the partitions at once.
     *
     * @param record to process.
     * @param ctx, the context in which this function is executed.
     */
    void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the non-broadcast input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endNonBroadcastInput(NonPartitionedContext<OUT> ctx) {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the broadcast input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endBroadcastInput(NonPartitionedContext<OUT> ctx) {}
}

TwoOutputStreamProcessFunction

Code Block
languagejava
titleTwoOutputStreamProcessFunction.java
/** This class contains all logical related to process and emit records to two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    /**
     * Process and emit record to the first/second output through {@link Collector}s.
     *
     * @param record to process.
     * @param output1 to emit processed records to the first output.
     * @param output2 to emit processed records to the second output.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecord(
            IN record, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx);

   /**
     * This is a life-cycle method indicates that this function will no longer receive any input
     * data.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {}
}

We can see that each process function provides the life-cycle hook for endInput. The runtime engine will call back this method after processing all data of this input, providing the final opportunity to send data to downstream. This is crucial for implementing something like full-aggregation window.

DataStreams

In general, we will expose 4 types of DataStream interfaces to users, partitioning and process can be applied to these data streams.

NonKeyedPartitionStream

Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, each parallelism is a
 * partition, and the partition to which the data belongs is random.
 */
public interface NonKeyedPartitionStream<T> {
    /**
     * Apply an operation to this {@link NonKeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link NonKeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply to a two input operation on this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();
Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, each parallelism is a
 * partition, and the partition to which the data belongs is random.
 */
public interface NonKeyedPartitionStream<T> {
    /**
     * ApplyTransform anthis operationstream to thisa {@link NonKeyedPartitionStreamKeyedPartitionStream};.
     *
     * @param processFunctionkeySelector to decide how to map performdata to operationpartition.
     * @return new stream with this operation. the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     */
 Transform this stream <OUT>to NonKeyedPartitionStream<OUT>a process(
new {@link NonKeyedPartitionStream}, data will be shuffled between
     OneInputStreamProcessFunction<T, OUT> processFunction);

* these two streams.
     /**
     * Apply@return athe twotransformed outputstream operation to this {@link NonKeyedPartitionStream}after shuffle.
     */
     * @param processFunction to perform two output operation.NonKeyedPartitionStream<T> shuffle();

    /**
     * @returnTransform newthis stream with this operationto a new {@link BroadcastStream}.
     */
    <OUT1, OUT2>* TwoNonKeyedPartitionStreams<OUT1,@return OUT2>the process(
transformed {@link BroadcastStream}.
     */
     TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunctionBroadcastStream<T> broadcast();

    /**
     * ApplySink todata a two input operation on this and other {@link NonKeyedPartitionStream}from this stream.
     *
     * @param other {@link NonKeyedPartitionStream} sink to performreceive operationdata withfrom twothis inputstream.
     */
  @param processFunction tovoid perform operation.toSink(Sink<T> sink);

     /**
 @return new stream with this* operation.
This class represents a combination */
of two {@link  <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(NonKeyedPartitionStream}. It will be used
     * as the return value of operation with NonKeyedPartitionStream<T_OTHER> other,two output.
     */
    interface TwoNonKeyedPartitionStreams<T1, T2> {
 TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
 Get the first stream. */
 Apply a two input operation to this and other {@link BroadcastStream}.NonKeyedPartitionStream<T1> getFirst();

     *
     /** @param processFunctionGet tothe performsecond operationstream. */
     * @return new stream with this operation.NonKeyedPartitionStream<T2> getSecond();
    }
}

KeyedPartitionStream

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, Each key group is a
 * partition, and the partition to which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> { */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * CoalesceApply thisan streamoperation to athis {@link GlobalStreamKeyedPartitionStream}.
     *
     * @return<p>This themethod coalescedis globalused stream.
to avoid shuffle after applying */
the process function. It GlobalStream<T> global();

is required
     /**
 that for the same *record, Transformthe this stream to a new {@link KeyedPartitionStream}.
     *KeySelector} must extract the same key as the
     * @paramoriginal keySelector{@link toKeySelector} decideon how to map data to partitionthis {@link KeyedPartitionStream}.
     *
 @return the transformed stream partitioned* by@param key.
processFunction to    */perform operation.
    <K> KeyedPartitionStream<K,* T> keyBy(KeySelector<T, K> keySelector);

    /**@param newKeySelector to select the key after process.
     * Transform this stream to a@return new {@link NonKeyedPartitionStream}, data will be shuffled betweenKeyedPartitionStream} with this operation.
     */
    <OUT> * these two streams.
KeyedPartitionStream<K, OUT> process(
       *
     *OneInputStreamProcessFunction<T, @returnOUT> theprocessFunction,
 transformed stream after shuffle.
     */
   KeySelector<OUT, NonKeyedPartitionStream<T>K> shuffle(newKeySelector);

    /**
     * TransformApply thisan streamoperation to a newthis {@link BroadcastStreamKeyedPartitionStream}.;
     *
     * @return@param theprocessFunction transformedto {@linkperform BroadcastStream}operation.
     */
 @return new {@link BroadcastStream<T> broadcast();

    /**
     * Sink data from this streamNonKeyedPartitionStream} with this operation.
     */
    <OUT> * @param sink to receive data from this stream.
NonKeyedPartitionStream<OUT> process(
         */
    void toSink(Sink<T> sinkOneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * ThisApply classa representstwo aoutput combinationoperation ofto twothis {@link NonKeyedPartitionStreamKeyedPartitionStream}. It will be used
     *
 as the return value of* operation<p>This withmethod twois output.
used to avoid shuffle after */
applying the process function. interfaceIt TwoNonKeyedPartitionStreams<T1, T2> {is required
     * that for /**the Getsame therecord, firstthese stream. */
        NonKeyedPartitionStream<T1> getFirst();
new two {@link KeySelector}s must extract the same key as the
     * original  /** Get the second stream. */{@link KeySelector}s on this {@link KeyedPartitionStream}.
     *
     * @param processFunction NonKeyedPartitionStream<T2> getSecond();
    }
}

KeyedPartitionedStream

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, Each key group is a
 * partition, and the partition to which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> {
    /**to perform two output operation.
     * @param keySelector1 to select the key of first output.
     * @param keySelector2 to select the key of second output.
     * @return new {@link TwoKeyedPartitionStreams} with this operation.
     */
   Apply an<OUT1, operationOUT2> toTwoKeyedPartitionStreams<K, thisOUT1, {@link KeyedPartitionStream}.OUT2> process(
     *
      * <p>ThisTwoOutputStreamProcessFunction<T, methodOUT1, isOUT2> usedprocessFunction,
 to avoid shuffle after applying the process function. It is required
 KeySelector<OUT1, K> keySelector1,
  * that for the same record, the new {@link KeySelector} mustKeySelector<OUT2, extract the same key as theK> keySelector2);

    /**
     * original {@link KeySelector} onApply a two output operation to this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @param@return newKeySelectornew to{@link selectTwoNonKeyedPartitionStreams} thewith keythis after processoperation.
     */
 @return  new {@link KeyedPartitionStream} with this operation.
<OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
        */
    <OUT> KeyedPartitionStream<K, OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**    TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}. The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * Apply@param an operation to this other {@link KeyedPartitionStream};
 to perform operation with *two input.
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.

     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
        */
    <OUT> NonKeyedPartitionStream<OUT> process(KeyedPartitionStream<K, T_OTHER> other,
            OneInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * Apply a two outputinput operation to this and other {@link KeyedPartitionStream}.The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, thesethe new two {@link KeySelector}s must extract the same key as the
     * original {@link KeySelector}s on these thistwo {@link KeyedPartitionStream}s.
     *
     * @param processFunction other {@link KeyedPartitionStream} to perform operation with two output operationinput.
     * @param keySelector1 to select the key of first output @param processFunction to perform operation.
     * @param keySelector2newKeySelector to select the key ofafter second outputprocess.
     * @return new {@link TwoKeyedPartitionStreamKeyedPartitionStream} with this operation.
     */
    <OUT1<T_OTHER, OUT2>OUT> TwoKeyedPartitionStreams<KKeyedPartitionStream<K, OUT1, OUT2> processOUT> connectAndProcess(
            TwoOutputStreamProcessFunction<TKeyedPartitionStream<K, OUT1, OUT2> processFunctionT_OTHER> other,
            KeySelector<OUT1TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, K>OUT> keySelector1processFunction,
            KeySelector<OUT2KeySelector<OUT, K> keySelector2newKeySelector);

    /**
     * Apply a two outputinput operation to this and other {@link KeyedPartitionStreamBroadcastStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new {@link TwoNonKeyedPartitionStream}stream with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(<T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoOutputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, OUT1T_OTHER, OUT2>OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStreamBroadcastStream}.
  The  two keyed*
     * <p>This streamsmethod mustis haveused theto sameavoid partitions,shuffle otherwiseafter itapplying makesthe noprocess sensefunction. toIt connectis them.required
     * that for the record from non-broadcast input, the new {@link KeySelector} must extract the
     * @param other same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. to perform operation with two input.For the
     * @paramrecord processFunctionfrom tobroadcast perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.input, the output key from keyed partition itself instead of new key
     */
 selector, so it <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(is safe already.
     *
     * @param other KeyedPartitionStream<K, T_OTHER> other,
     {@link BroadcastStream} to perform operation with two input.
     * @param TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);
processFunction to perform operation.
     /**
 @param newKeySelector to select the *key Applyafter aprocess.
 two input operation to this* and@return othernew {@link KeyedPartitionStream}.The with twothis keyedoperation.
     */
 streams must have the same partitions, otherwise it makes no sense to connect them.<T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
     *
       TwoInputBroadcastStreamProcessFunction<T, T_OTHER, *OUT> <p>ThisprocessFunction,
 method is used to avoid shuffle after applying the process function. ItKeySelector<OUT, is requiredK> newKeySelector);

    /**
   * that for* theCoalesce samethis record,stream theto newa {@link KeySelectorGlobalStream} must extract the same key as the
     * original {@link KeySelector}s on these two {@link KeyedPartitionStream}s..
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

     /**
     * @paramTransform otherthis {@linkstream KeyedPartitionStream} to perform operationa withnew two{@link inputKeyedPartitionStream}.
     * @param processFunction to perform operation.
     * @param newKeySelectorkeySelector to decide how selectto themap keydata afterto processpartition.
     * @return newthe {@linktransformed KeyedPartitionStream}stream withpartitioned thisby operationkey.
     */
    <NEW_KEY> <TKeyedPartitionStream<NEW_OTHERKEY, OUT>T> KeyedPartitionStream<KkeyBy(KeySelector<T, OUT> connectAndProcess(
     NEW_KEY> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these KeyedPartitionStream<K, T_OTHER> other,two streams.
     *
     * @return the TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction,transformed stream after shuffle.
     */
       KeySelector<OUT, K> newKeySelectorNonKeyedPartitionStream<T> shuffle();

    /**
     * ApplyTransform a two input operationthis stream to thisa and othernew {@link BroadcastStream}.
     *
     * @param@return processFunctionthe totransformed perform{@link operationBroadcastStream}.
     */
 @return new stream with this operation.BroadcastStream<T> broadcast();

     /**/
     * Sink <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(data from this stream.
     *
     *  BroadcastStream<T_OTHER> other,
    @param sink to receive data from this stream.
     */
   TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction void toSink(Sink<T> sink);

    /**
     * This class Coalescerepresents thisa streamcombination toof atwo {@link GlobalStreamKeyedPartitionStream}.
 It will be used *as
     * @return the coalesced global stream.
     */the return value of operation with two output.
     */
    interface TwoKeyedPartitionStreams<K, T1, T2> {
    GlobalStream<T> global();

    /**
 Get the first stream. */
 Transform this stream to a new {@link KeyedPartitionStream}.
   KeyedPartitionStream<K, T1> getFirst();

   *
     /** @paramGet keySelectorthe tosecond decide how to map data to partition.
stream. */
        KeyedPartitionStream<K, T2> *getSecond();
 @return the transformed stream partitioned by key.
     */
    <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);
 }
}

GlobalStream

Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> {
    /**
     * TransformApply thisan streamoperation to a newthis {@link NonKeyedPartitionStreamGlobalStream},;
 data will be shuffled between*
     * @param theseprocessFunction to twoperform streamsoperation.
     * @return new stream with this operation.
     */
 @return the transformed stream<OUT> afterGlobalStream<OUT> shuffle.process(
     */
       OneInputStreamProcessFunction<T, NonKeyedPartitionStream<T>OUT> shuffle(processFunction);

    /**
     * Apply a Transformtwo thisoutput streamoperation to athis new {@link BroadcastStreamGlobalStream}.
     *
     * @return@param theprocessFunction transformedto {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

perform two output operation.
     /**
 @return new stream  * Sink data from with this streamoperation.
     */
    <OUT1, *OUT2> @paramTwoGlobalStream<OUT1, sinkOUT2> toprocess(
 receive data from this stream.
     */
   TwoOutputStreamProcessFunction<T, void toSink(Sink<T> sinkOUT1, OUT2> processFunction);

    /**
     * Apply Thisa two classinput representsoperation ato combinationthis ofand twoother {@link KeyedPartitionStreamGlobalStream}.
 It will be used as*
     * the return value of @param other {@link GlobalStream} to perform operation with two outputinput.
     */
 @param processFunction to interface TwoKeyedPartitionStreams<K, T1, T2> {
        /** Get the first stream.perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT>   KeyedPartitionStream<K, T1> getFirst();

        /** Get the second stream. */
GlobalStream<OUT> connectAndProcess(
            GlobalStream<T_OTHER> KeyedPartitionStream<Kother,
 T2> getSecond();
    }
}

GlobalStream

Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> {TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * ApplyTransform anthis operationstream to thisa {@link GlobalStreamKeyedPartitionStream};.
     *
     * @param processFunctionkeySelector to performdecide operation.
how to map   * @return new stream with this operationdata to partition.
     */
 @return the transformed stream <OUT>partitioned GlobalStream<OUT>by process(key.
     */
    <K> KeyedPartitionStream<K, T> OneInputStreamProcessFunction<TkeyBy(KeySelector<T, OUT>K> processFunctionkeySelector);

    /**
     * ApplyTransform athis twostream outputto operationa tonew this {@link GlobalStreamNonKeyedPartitionStream}.
, data will be shuffled *between
     * @param processFunction to perform these two output operationstreams.
     * @return new stream with this operation.
     */
 @return the transformed <OUT1, OUT2> TwoGlobalStream<OUT1, OUT2> process(
     stream after shuffle.
     */
  TwoOutputStreamProcessFunction<T, OUT1, OUT2>NonKeyedPartitionStream<T> processFunctionshuffle();

    /**
     * ApplyTransform athis twostream input operation to thisa and othernew {@link GlobalStreamBroadcastStream}.
     *
     * @param@return the othertransformed {@link GlobalStreamBroadcastStream}.
 to perform operation with two input. */
     * @param processFunction to perform operation.BroadcastStream<T> broadcast();

    /**
     * @returnSink newdata stream withfrom this operationstream.
     */
    <T_OTHER, OUT>* GlobalStream<OUT> connectAndProcess(
            GlobalStream<T_OTHER> other,
    @param sink to receive data from this stream.
     */
   TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction void toSink(Sink<T> sink);

    /**
     * This Transformclass represents thisa streamcombination toof atwo {@link KeyedPartitionStreamGlobalStream}.
 It will be used *
     * @param keySelector to decide how to map data to partition.as the
     * @returnreturn thevalue transformedof streamoperation partitionedwith bytwo keyoutput.
     */
    <K>interface KeyedPartitionStream<KTwoGlobalStream<T1, T> keyBy(KeySelector<T, K> keySelector);

T2> {
        /**
 Get the first stream. */
 Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
GlobalStream<T1> getFirst();

        /** theseGet the twosecond streamsstream. */
       * GlobalStream<T2> getSecond();
     * @return the transformed stream after shuffle.
    }
}

BroadcastStream

Code Block
languagejava
titleBroadcastStream.java
/** This class represents a stream that each parallel task processes the same data. */
public interface   NonKeyedPartitionStream<T> shuffle();

BroadcastStream<T> {
    /**
     * Transform this streamApply a two input operation to this aand newother {@link BroadcastStreamKeyedPartitionStream}.
     *
     * @return@param the transformedother {@link BroadcastStream}KeyedPartitionStream} to perform operation with two input.
     */
 @param processFunction to BroadcastStream<T> broadcast();

perform operation.
     /*** @return new stream with this operation.
     * Sink data from this stream./
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
     *
     * @param sink to receive data from this stream.
KeyedPartitionStream<K, T_OTHER> other,
         */
    void toSink(Sink<T> sinkTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * Apply a Thistwo classinput representsoperation ato combinationthis ofand twoother {@link GlobalStreamNonKeyedPartitionStream}. It
 will be used as the*
     * return value of@param other {@link NonKeyedPartitionStream} to perform operation with two output.
     */
    interface TwoGlobalStream<T1, T2> {
  input.
     * /**@param GetprocessFunction theto firstperform streamoperation. */
     * @return new GlobalStream<T1> getFirst();

stream with this operation.
     */
   /** Get the second stream. */ <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
        GlobalStream<T2> getSecond();
   NonKeyedPartitionStream<T_OTHER> }
}

BroadcastStream

Code Block
languagejava
titleBroadcastStream.java
/** This class represents a stream that each parallel task processes the same data. */
public interface BroadcastStream<T> {other,
            TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * @param<p>This othermethod {@linkis KeyedPartitionStream}used to performavoid operationshuffle withafter twoapplying input.
the process function. It  * @param processFunction to perform operation.is required
     * @returnthat newfor streamthe withrecord this operation.
     */
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from broadcast input, the output TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**key from keyed partition itself instead of new key
     * Applyselector, aso twoit inputis operation to this and other {@link NonKeyedPartitionStream}safe already.
     *
     * @param other {@link NonKeyedPartitionStreamKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new new{@link streamKeyedPartitionStream} with this operation.
     */
    <K, <TT_OTHER, OUT> NonKeyedPartitionStream<OUT>KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, NonKeyedPartitionStream<TT_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector); 
}

Similarly to source, we only supports sinkV2 based sink.

The FLIP needs to move the following classes from flink-core into flink-core-api module:

Class Full Path

org.apache.flink.api.common.functions.Function

org.apache.flink.api.java.functions.KeySelector

Compatibility, Deprecation, and Migration Plan

...