Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties



Introduction

As the first sub-FLIP for DataStream API V2, we'd like to discuss and try to answer some of the most fundamental questions in stream processing.As the first sub-FLIP for DataStream API V2, we'd like to discuss and try to answer some of the most fundamental questions in stream processing:

  1. What kinds of data streams do we have?
  2. How to partition data over the streams?
  3. How to define a processing on the data stream?

...

The specific transformation relationship is shown in the following table:

Partitioning

Output

Global

Keyed

NonKeyed

Broadcast

Input

Global

KeyBy

Shuffle

Broadcast

Keyed

Global

KeyBy

Shuffle

Broadcast

NonKeyed

Global

KeyBy

Shuffle

Broadcast

Broadcast

(A crossed box indicates that it is not supported or not required)

One thing to note is: broadcast can only be used as side-input of other Inputs and cannot be directly converted to other streamsOne thing to note is: broadcast can only be used in conjunction with other inputs and cannot be directly converted to other streams.

ProcessFunction

...

Once we have the data stream, we can apply operations on it. The operations that can be performed over DataStream are collectively called Process Function. It is the only entrypoint for defining all kinds of processings on the data streams.

Classification of ProcessFunction

According to the number of input / output, they are classified as follows:

TwoInputStreamProcessFunction

Process Function

number of inputs

number of outputs

OneInputStreamProcessFunction

1

1

TwoInputNonBroadcastStreamProcessFunction

2

1

TwoInputBroadcastStreamProcessFunction

2

1

TwoOutputStreamProcessFunction

1

2

Logically, process functions that support more inputs and outputs can be achieved by combining them, but this implementation might be inefficient. If the call for this becomes louder, we will consider supporting as many output edges as we want through a mechanism like OutputTag. But this loses the explicit generic type information that comes with using ProcessFunction.

The case of two input is relatively special, and we have divided it into two categories:

  1. TwoInputNonBroadcastStreamProcessFunction: Neither of its inputs is a BroadcastStream, so processing only applied to the single partition.
  2. TwoInputBroadcastStreamProcessFunction: One of its inputs is the BroadcastStream, so the processing of this input is applied to all partitions. While the other side is Keyed/Non-Keyed Stream, it's processing applied to single partition.

Advantages of ProcessFunction

...

  • Clearer definition: From the DataStream's perspective, it only needs to understand the semantics of functions. Built-in  operations in operations such as map / flatMap / reduce / join can still be supported, but are decoupled from the core framework. That is to say, for DataStream V2, every operation is a process function .

  • Don't expose operators to users: We believe functions with access to proper runtime information and services are good enough for users to define custom data processing logics. Operators on the other hand are more an internal concept of Flink and users should not be allowed to directly use them. Besides, in V1 users are invited to extend `AbstractStreamOperator` in order to define their custom operators, leading to unnecessary dependencies and unpredictable behaviors. In V2, users should define their custom behaviors by implementing interfaces rather than extending framework classes.

...

For OneInputStreamProcessFunction:

Input Stream

Output Stream

Global

Global

Keyed

Keyed / Non-Keyed

NonKeyed

NonKeyed

Broadcast

Not Supported

When KeyedPartitionStream is used as input, the output can be either a KeyedPartitionStream or NonKeyedPartitionStream. For general data processing logic, how to partition data is uncertain, we can only expect a NonKeyedPartitionStream. If we do need a deterministic partition, we can follow it with a KeyBy partitioning. However, there are times when we know for sure that the partition of records will not change before and after processing, shuffle cost due to the extra partitioning can be avoided. To be safe, in this case we ask for a KeySelector for the output data, and the framework checks at runtime to see if this invariant is broken. The same is true for two output and two input counterparts. For a more detailed explanation, see the API definition of KeyedPartitionStream in the Proposed Changes section below.

...

For TwoOutputStreamProcessFunction:

Input Stream

Output Stream

Global

Global  + Global

Keyed

Keyed + Keyed / Non-Keyed + Non-Keyed

NonKeyed

NonKeyed + NonKeyed

Broadcast

Not Supported

There are two points to note here:

...

A cross(❎) indicates not supported.

Output

Input2

Global

Keyed

NonKeyed

Broadcast

Input1

Global

Global

Keyed

Non-Keyed / Keyed

Non-Keyed / Keyed

NonKeyed

Non-Keyed

Non-Keyed

Broadcast

Non-Keyed / Keyed

Non-Keyed

  1. The reason why the connection between Global Stream and Non-Global Stream is not supported is that the number of partitions of GlobalStream is forced to be 1, but it is generally not 1 for Non-Global Stream, which will cause conflicts when determining the number of partitions of the output stream. If necessary, they should be transformed into mutually compatible streams and then connected.
  2. Connecting two broadcast streams doesn't really make sense, because each parallelism would have exactly same input data from both streams and any process would be duplicated. 
  3. The reason why the output of two keyed partition streams can be keyed or non-keyed is the same as we mentioned above in the case of single input.
  4. When we connect two KeyedPartitioinStream, they must have the same key type, otherwise we can't decide how to merge the partitions of the two streams. At the same time, things like access state and register timer are also restricted to the partition itself, cross-partition interaction is not meaningful

    When we connect two KeyedPartitionStream, they must have the same key type, otherwise we can't decide how to merge the partitions of the two streams. At the same time, things like access state and register timer are also restricted to the partition itself, cross-partition interaction is not meaningful.

  5. The reasons why the connection between KeyedPartitionStream and NonKeyedPartitionStream is not supported are as follows:
    1. The data on KeyedStream is deterministic, but on NonKeyed is not. It is difficult to think of a scenario where the two need to be connected.
    2. This will complicate the state declaration and access rules. A more detailed discussion can be seen in the subsequent state-related sub-FLIP.
    3. If we see that most people have clear demands for this, we can support it in the future.

...

Before introducing the specific changes, let's first look at what the simplest job(increase every record by one) developed with the new API looks like:

Code Block
languagejavatitleExample.java
// create environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create a stream from source
env.fromSource(someSource)
    // map every element x to x + 1. This is just to show the API as comprehensively as possible. In fact, we can use lambda expressions instead.
    .process(new SingleStreamProcessFunction<IntegerOneInputStreamProcessFunction<Integer, Integer>() {
                    @Override
                    public void processRecord(
                            Integer x,
                            Collector<Integer> output)
                            throws Exception {
                        output.collect(x + 1);
                    }
                })
    // If the sink does not support concurrent writes, we can force the stream to one partition
    .global()
    // sink the stream to some sink 
    .toSink(someSink);
// execute the job
env.execute()// create environment

...

Currently, we only support adding FLIP-27 based source. The stream returned from `fromSource` method is Non-KeyedPartitionStream by default. If there is a clear key selecting strategy, the keyBy partitioning can be followed later. The connector part will be explained in more detail in future FLIP.

ProcessFunction

...

Process function is used to describe the processing logic of data. It is the key part for users to implement their job. Overall, we have a base interface for all user defined process functions that contains some life cycle methods, such as open and close. In addition, it also contains some common methods related to state and watermark, but we omit these methods here for simplicity, and we will introduce it in the corresponding sub-FLIPs.

Code Block
languagejava
titleProcessFunction.java
/** This is the base class for all user defined process functions. */
public interface ProcessFunction extends Function {
    /**
     * Initialization method for the function. It is called before the actual working methods (like
     * processRecord) and thus suitable for one time setup work.
     *
     * <p>By default, this method does nothing.
     *
     * @throws Exception Implementations may forward exceptions, which are caught by the runtime.
     *     When the runtime catches an exception, it aborts the task and lets the fail-over logic
     *     decide whether to retry the task execution.
     */
    default void open() throws Exception {}

    /**
     * Tear-down method for the user code. It is called after the last call to the main working
     * methods (e.g. processRecord).
     *
     * <p>This method can be used for clean up work.
     *
     * @throws Exception Implementations may forward exceptions, which are caught by the runtime.
     *     When the runtime catches an exception, it aborts the task and lets the fail-over logic
     *     decide whether to retry the task execution.
     */
    default void close() throws Exception {}

    // Omit some methods related to state and watermark here.
}

...

Code Block
languagejava
titleCollector.java
/** This class take response for collecting data to output stream. */
public interface Collector<OUT> {
    /**
     * Collect record to output stream.
     *
     * @param record to be collected.
     */
    void collect(OUT record);      
    
    /**
     * Collect record Overwrite the timestamp of this record and collect it to output stream.
     *
     * @param record to be collected.
     * @param timestamp of the processed data.
     */
    void collectcollectAndOverwriteTimestamp(OUT record, long timestamp); 
}

OneInputStreamProcessFunction

NonPartitionedContext

Sometimes it is not possible to decide on the current partition in the context of the processing. For example, when processing the input records from the broadcast edge. Therefore, we introduce a mechanism to process all partitions.

Note: RuntimeContext contains information about the context in which process functions are executed. It is currently just an empty interface but will be expanded later, such as supporting access to state, registering timers, etc. This part will be elaborated in the subsequent sub-FLIPs.

Code Block
languagejava
titleCollector.
Code Block
languagejava
titleOneInputStreamProcessFunction.java
/**
 * This classinterface containsrepresents all logical related to process records from single input. */the context associated with all operations must be applied to all
 * partitions.
 */ 
public interface OneInputStreamProcessFunction<IN,NonPartitionedContext<OUT> OUT> extends ProcessFunctionRuntimeContext {
    /**
     * ProcessApply recorda andfunction emitto data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed recordsall partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
 @param ctx, runtime contextvoid in which thisapplyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}

/** A function is executed.
    to be applied to all partitions . */
@FunctionalInterface
public interface   void processRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws Exception;

ApplyPartitionFunction<OUT> {
    /**
     * ThisThe isactual amethod life-cycleto methodbe indicatesapplied thatto thiseach functionpartition.
 will no longer receive any input*
     * @param collector to output data.
 This allowing the ProcessFunction to* emit@param resultsctx runtime atcontext oncein ratherwhich thanthis uponfunction eachis recordexecuted.
     */
    void * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void endInput(Collector<OUT> output, RuntimeContext ctx) {}
}

TwoInputStreamProcessFunction

Code Block
languagejava
titleTwoInputStreamProcessFunction.java
/** This class contains all logical related to process records from two input. */
public interface TwoInputStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**apply(Collector<OUT> collector, RuntimeContext ctx) throws Exception;
}

/**
 * This interface represents the context associated with all operations must be applied to all
 * partitions with two outputs.
 */
public interface TwoOutputNonPartitionedContext<OUT1, OUT2> extends RuntimeContext {
    /**
     * Apply a function to all partitions. For keyed stream, it will apply to all keys. For
     * Process record from first input and emit data through {@link Collector}non-keyed stream, it will apply to single partition.
     */
    void * @param record to process.
     * @param output to emit processed records.applyToAllPartitions(TwoOutputNonPartitionedContext<OUT1, OUT2> applyPartitionFunction);
}

/** A function to be applied to all partitions with two outputs.. */
@FunctionalInterface
public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> {
    /**
     * @paramThe ctx,actual runtimemethod contextto inbe whichapplied thisto functioneach is executedpartition.
     */
    void processRecordFromFirstInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
   * @param firstOutput to emit record to first output.
     * @param secondOutput  throws Exception;

    /**
     * Process record from second input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed recordsto emit record to second output.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromSecondInputapply(IN2Collector<OUT1> recordfirstOutput, Collector<OUT>Collector<OUT2> outputsecondOutput, RuntimeContext ctx)
            throws Exception;

    /**
     *}

OneInputStreamProcessFunction

Code Block
languagejava
titleOneInputStreamProcessFunction.java
/** This class contains all logical related to process records from single input. */
public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction { This is a life-cycle method indicates that this function will no longer receive any data from
     /**
 the first input. This allowing* theProcess ProcessFunctionrecord toand emit resultsdata atthrough once rather than upon
     * each record{@link Collector}.
     *
     * @param outputrecord to emit recordprocess.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    default void endFirstInputprocessRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws {}Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the second input. This allowing the ProcessFunction to emit results at once rather than upon
     * each record.
     *data.
     * @param output to emit record.
     * @param ctx, runtimethe context in which this function is executed.
     */
    default void endSecondInputendInput(Collector<OUT> output, RuntimeContext ctx) NonPartitionedContext<OUT> ctx) {}
}

...

TwoInputNonBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoOutputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction.java
/** This class contains all logical related to process and emit records tofrom two outputsnon-broadcast input. */
public interface TwoOutputStreamProcessFunction<INTwoInputNonBroadcastStreamProcessFunction<IN1, OUT1IN2, OUT2>OUT> extends ProcessFunction {
    /**
     * Process andrecord emitfrom recordfirst toinput theand first/secondemit outputdata through {@link Collector}s.
     *
     * @param record to process.
     * @param output1 to emit processed records to the first output.
     * @param output2 to emit processed records to the second output.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordprocessRecordFromFirstInput(
IN1 record, Collector<OUT> output, RuntimeContext ctx)
       IN record, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx)throws Exception;

    /**
     * ThisProcess isrecord a life-cycle method indicates that this function will no longer receive any input
     * data. This allowing the ProcessFunction to emit results at once rather than upon each recordfrom second input and emit data through {@link Collector}.
     *
     * @param output1 to emit record to the first outputprocess.
     * @param output2output to emit record to the second outputprocessed records.
     * @param ctx, runtime context in which this function is executed.
     */
    default void endInputprocessRecordFromSecondInput(
IN2 record, Collector<OUT> output, RuntimeContext ctx)
       Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx) {}
}

RuntimeContext contains information about the context in which process functions are executed. It is currently just an empty interface but will be expanded later, such as supporting access to state, registering timers, etc. This part will be elaborated in the subsequent sub-FLIPs.

We can see that each process function provides the life-cycle hook for endInput. The runtime engine will call back this method after processing all data of this input, providing the final opportunity to send data to downstream. This is crucial for implementing something like full-aggregation window.

DataStreams

In general, we will expose 4 types of DataStream interfaces to users, partitioning and process can be applied to these data streams.

NonKeyedPartitionedStream

throws Exception;

    /**
     * This 

...

is a 

...

life-cycle 

...

method 

...

indicates 

...

that 

...

this 

...

function 

...

will 

...

no 

...

longer 

...

receive 

...

any 

...

data from
     * the 

...

first input.
 

...

 

...

 

...

 

...

 

...

*

...


     * 

...

@param ctx, the context in which this 

...

function is executed.
     */
    

...

default void endFirstInput(NonPartitionedContext<OUT> ctx) {}

    /**
     * 

...

This 

...

is 

...

a 

...

life-cycle 

...

method 

...

indicates that this function will 

...

no longer receive any 

...

data from
     * the second input.
    

...

 

...

*

...


     * 

...

@param 

...

ctx, 

...

the 

...

context 

...

in 

...

which this 

...

function is executed.
     */
    default 

...

void 

...

endSecondInput(NonPartitionedContext<OUT> ctx) {}
}

TwoInputBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoInputBroadcastStreamProcessFunction
/**
 * This class contains all logical related to process records from a broadcast stream and a
 * non-broadcast stream.
 */
public interface TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * 

...

Process 

...

record 

...

from 

...

non-broadcast input 

...

and 

...

emit 

...

data 

...

through {@link 

...

Collector}.
     *
     * @param 

...

record to process.
     * @param 

...

output to 

...

emit 

...

processed records.
     * @param ctx, 

...

runtime 

...

context 

...

in 

...

which this 

...

function is executed.
     */
    

...

void processRecordFromNonBroadcastInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
            

...

throws Exception;

    /**
     * 

...

Process record 

...

from 

...

broadcast input.In general, 

...

the 

...

broadcast 

...

side 

...

is 

...

not 

...

allowed to
     *

...

 manipulate state and output 

...

data because 

...

it 

...

corresponds to 

...

all 

...

partitions instead of a single
 

...

 

...

 

...

 

...

 

...

* 

...

partition. But you could use broadcast context to process all the partitions at once.
     *
     * 

...

@param record to process.
     * @param ctx, the context in which 

...

this function is executed.
     */

...


    void 

...

processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) throws Exception;

    

...

/**
     * 

...

This 

...

is 

...

a 

...

life-cycle method indicates that this function will no longer receive any data from
     * 

...

the 

...

non-broadcast input.
     *
     * @param 

...

ctx, 

...

the 

...

context 

...

in 

...

which 

...

this 

...

function 

...

is 

...

executed.
     */
 

...

 

...

 

...

 

...

default 

...

void endNonBroadcastInput(NonPartitionedContext<OUT> ctx) {}

    /**
     * 

...

This 

...

is 

...

a 

...

life-cycle 

...

method 

...

indicates 

...

that this function will no longer receive any data from
     * 

...

the 

...

broadcast 

...

input.
     *
     * 

...

@param ctx, the

...

 context in which this function is executed.
     */
    

...

default 

...

void endBroadcastInput(NonPartitionedContext<OUT> ctx) {}
}

TwoOutputStreamProcessFunction

Code Block
languagejava
titleTwoOutputStreamProcessFunction.java
/** This class contains all logical related to process and emit records to two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    

...

/**
     *

...

 Process and emit record to the first/second output through {@link 

...

Collector}s.
     *

...


     

...

* @param record to process.
     * 

...

@param 

...

output1 

...

to 

...

emit 

...

processed records to the first 

...

output.
     * @param 

...

output2 to emit processed 

...

records 

...

to 

...

the 

...

second 

...

output.
     *

...

 @param ctx, runtime 

...

context 

...

in which this function is executed.
     */
 

...

 

...

 

...

 

...

void 

...

processRecord(
 

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

    IN 

...

record, 

...

Collector<OUT1> 

...

output1, 

...

Collector<OUT2> 

...

output2, 

...

RuntimeContext ctx);

   /**
     *

...

 This is a 

...

life-cycle method indicates that this function will no longer receive any input
     * data.
     *
     * 

...

@param ctx, the context in which this function is executed.
     */
    default void endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> 

...

ctx) {}
}

We can see that each process function provides the life-cycle hook for endInput. The runtime engine will call back this method after processing all data of this input, providing the final opportunity to send data to downstream. This is crucial for implementing something like full-aggregation window.

DataStreams

In general, we will expose 4 types of DataStream interfaces to users, partitioning and process can be applied to these data streams.

NonKeyedPartitionStream

Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, 

...

each 

...

parallelism is a
 * partition, and the partition to which the data belongs is 

...

random.
 */
public interface 

...

NonKeyedPartitionStream<T> 

...

{
    /**
     * Apply an operation to this {@link 

...

NonKeyedPartitionStream}

...

;
     *
     * 

...

@param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a 

...

two 

...

output 

...

operation 

...

to this {@link 

...

NonKeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * 

...

@return 

...

new 

...

stream 

...

with this operation.
     */
    

...

<OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, 

...

OUT2> process(
            

...

TwoOutputStreamProcessFunction<T, 

...

OUT1,

...

 

...

OUT2> processFunction);

    /**
     * Apply

...

 to a two input operation 

...

on this and other {@link 

...

NonKeyedPartitionStream}

...

.
     *
     * @param

...

 other {@link NonKeyedPartitionStream} to perform operation with two input.
     * 

...

@param 

...

processFunction 

...

to 

...

perform operation.
     * @return new stream with this operation.
     */
    

...

<T_OTHER, OUT> NonKeyedPartitionStream<OUT> 

...

connectAndProcess(
            

...

NonKeyedPartitionStream<T_OTHER> other,
            TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two 

...

input operation to this and other {@link 

...

BroadcastStream}.
     *
     * 

...

@param processFunction 

...

to 

...

perform 

...

operation.
 

...

 

...

 

...

 

...

 

...

* 

...

@return 

...

new 

...

stream 

...

with 

...

this 

...

operation.
     */
 

...

 

...

 

...

 <T_OTHER, 

...

OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
     

...

    

...

 

...

 

...

 

...

TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    

...

/**
     * 

...

Coalesce this 

...

stream to 

...

a 

...

{@link GlobalStream}.
     *
   

...

 

...

 

...

* 

...

@return the 

...

coalesced 

...

global 

...

stream.
     */
 

...

 

...

 

...

 

...

GlobalStream<T> global();

    /**
     *

...

 Transform this stream to a {@link 

...

KeyedPartitionStream}.
     *

...


    

...

 

...

* 

...

@param 

...

keySelector 

...

to 

...

decide how to map data 

...

to partition.
     * @return the transformed stream partitioned 

...

by key.
     */
    <K> KeyedPartitionStream<K, T> 

...

keyBy(KeySelector<T, K> 

...

keySelector);

    /**
     * 

...

Transform 

...

this 

...

stream 

...

to 

...

a 

...

new {@link 

...

NonKeyedPartitionStream}

...

, data will be shuffled 

...

between
     * 

...

these two streams.
     *
     * @return 

...

the 

...

transformed 

...

stream 

...

after 

...

shuffle.
     */
    

...

NonKeyedPartitionStream<T> shuffle();

    /**
     * 

...

Transform 

...

this 

...

stream 

...

to 

...

a 

...

new {@link 

...

BroadcastStream}.
 

...

   

...

 

...

*
     * 

...

@return 

...

the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    

...

/**
     * 

...

Sink 

...

data 

...

from 

...

this stream.
     *
     * @param 

...

sink to 

...

receive 

...

data from this 

...

stream.
     */
    

...

void toSink(Sink<T> sink);

    /**
     * This class 

...

represents a combination of two {@link NonKeyedPartitionStream}. It will be used
     

...

* as the return value of operation with two output.
     */
 

...

 

...

 

...

 

...

interface 

...

TwoNonKeyedPartitionStreams<T1, T2> {
        /** 

...

Get 

...

the 

...

first 

...

stream. */
        NonKeyedPartitionStream<T1> getFirst();

     

...

   

...

/** 

...

Get 

...

the 

...

second 

...

stream. */
        NonKeyedPartitionStream<T2> getSecond();
    }
}

KeyedPartitionStream

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * 

...

This 

...

class 

...

represents 

...

a 

...

kind 

...

of 

...

partitioned 

...

data stream. For this stream, Each key 

...

group is 

...

a
 * partition, and the 

...

partition 

...

to 

...

which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> {
    /**
 

...

 

...

 

...

 

...

 

...

* 

...

Apply 

...

an operation 

...

to this {@link KeyedPartitionStream}.
     *
 

...

 

...

 

...

 

...

 

...

* <p>This method is used 

...

to 

...

avoid 

...

shuffle 

...

after 

...

applying the 

...

process function. It is required
     * 

...

that 

...

for 

...

the 

...

same 

...

record, 

...

the 

...

new {@link KeySelector} must extract 

...

the same key 

...

as the
     * original {@link KeySelector} on this 

...

{@link KeyedPartitionStream}.
     *
     * @param 

...

processFunction to perform operation.
     * @param newKeySelector to select the key 

...

after process.
     

...

*

...

 @return 

...

new {@link KeyedPartitionStream} with this operation.
     */
    <OUT> 

...

KeyedPartitionStream<K, OUT> 

...

process(
           

...

 OneInputStreamProcessFunction<T, OUT> processFunction,
            

...

KeySelector<OUT, 

...

K> newKeySelector);

    /**
     * 

...

Apply 

...

an 

...

operation to 

...

this {@link 

...

KeyedPartitionStream}

...

;
     *
     * @param processFunction to perform 

...

operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    

...

<OUT> NonKeyedPartitionStream<OUT> 

...

process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * 

...

Apply a two output operation to 

...

this 

...

{@link KeyedPartitionStream}.
     *
     * 

...

<p>This method is used to 

...

avoid 

...

shuffle 

...

after 

...

applying 

...

the 

...

process 

...

function. It is required
     * that 

...

for the 

...

same 

...

record, 

...

these 

...

new 

...

two {@link KeySelector}s must extract 

...

the same key 

...

as the
     

...

*

...

 original {@link 

...

KeySelector}s on this

...

 

...

{@link 

...

KeyedPartitionStream}

...

.
 

...

 

...

 

...

 

...

 

...

*
     * 

...

@param processFunction to perform two output 

...

operation.
     *

...

 @param keySelector1 to select 

...

the 

...

key 

...

of 

...

first output.
     *

...

 @param keySelector2 to 

...

select the key of second output.
     * 

...

@return new {@link 

...

TwoKeyedPartitionStreams} with this operation.
     */
    <OUT1, 

...

OUT2> 

...

TwoKeyedPartitionStreams<K, 

...

OUT1, 

...

OUT2> process(
      

...

    

...

 

...

 TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction,
     

...

 

...

 

...

 

...

 

...

 

...

  KeySelector<OUT1, K> 

...

keySelector1,
     

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

KeySelector<OUT2, K> keySelector2);

    /**
     * 

...

Apply 

...

a 

...

two 

...

output 

...

operation 

...

to 

...

this {@link KeyedPartitionStream}.
 

...

 

...

 

...

 

...

 

...

*
     * 

...

@param processFunction to perform two output operation.
     * @return new {@link TwoNonKeyedPartitionStreams} with this operation.
     */
    

...

<OUT1, 

...

OUT2> 

...

TwoNonKeyedPartitionStreams<OUT1, 

...

OUT2> 

...

process(
        

...

  

...

 

...

 

...

TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

...

    

...

/**
 

...

 

...

 

...

 

...

 *

...

 Apply a two input operation to this 

...

and 

...

GlobalStream

...

other {@link KeyedPartitionStream}. The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
 

...

   

...

 

...

* 

...

@param 

...

other {@link 

...

KeyedPartitionStream}

...

 to perform operation with two 

...

input.
     * @param processFunction to perform operation.
     * @return new 

...

{@link NonKeyedPartitionStream} with this operation.
     */
    <T_OTHER, 

...

OUT> 

...

NonKeyedPartitionStream<OUT> 

...

connectAndProcess(
            

...

KeyedPartitionStream<K, T_OTHER> other,
            TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * Apply a two 

...

input operation to this and other {@link 

...

KeyedPartitionStream}.The two keyed
     *

...

 streams must have the 

...

same 

...

partitions, 

...

otherwise it makes no sense to connect them.
     *
 

...

 

...

 

...

 

...

 

...

* 

...

<p>This method is used to avoid 

...

shuffle after applying the 

...

process 

...

function. 

...

It 

...

is 

...

required
     * that for the same record, 

...

the new {@link KeySelector} must extract the same key as the
     * 

...

original 

...

{@link KeySelector}s on these two {@link KeyedPartitionStream}s.
     *
     * @param other {@link 

...

KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link 

...

KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> 

...

KeyedPartitionStream<K, OUT> connectAndProcess(
            

...

KeyedPartitionStream<K, T_OTHER> other,
            

...

TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply a 

...

two 

...

input 

...

operation to 

...

this and other {@link 

...

BroadcastStream}.
     *
     * @param 

...

processFunction to 

...

perform 

...

operation.
 

...

 

...

 

...

 

...

 * @return new stream with this operation.
     */
 

...

 

...

 

...

 

...

<T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
    

...

        BroadcastStream<T_OTHER> other,
    

...

        TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply 

...

a two 

...

input 

...

operation to this 

...

and 

...

other {@link 

...

BroadcastStream}

...

.
 

...

 

...

 

...

 

...

 

...

*
     * 

...

<p>This 

...

method 

...

is used to avoid shuffle 

...

after applying the 

...

process function. It is required
     

...

*

...

 that for the record 

...

from 

...

non-broadcast input, the new {@link 

...

KeySelector} must extract the
     *

...

 same key as the original {@link 

...

KeySelector}s 

...

on the

...

 {@link 

...

KeyedPartitionStream}. For the
     *

...

 record from broadcast input, 

...

the output key from keyed partition itself instead of new key
     * 

...

selector, so it is safe already.
     *
     * @param

...

 other {@link BroadcastStream} to 

...

perform 

...

operation 

...

with 

...

two 

...

input.
     *

...

 @param processFunction to 

...

perform operation.
     

...

* @param newKeySelector to select the key after process.
     * @return new {@link 

...

KeyedPartitionStream}

...

 

...

with this operation.
     */
    <T_OTHER, 

...

OUT> 

...

KeyedPartitionStream<K, 

...

OUT> 

...

connectAndProcess(
 

...

 

...

 

...

 

...

     

...

   

...

BroadcastStream<T_OTHER> other,
          

...

 

...

 

...

TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
        

...

 

...

   KeySelector<OUT, K> newKeySelector);

   

...

 /**
 

...

 

...

 

...

 

...

 *

...

 Coalesce this stream to a {@link GlobalStream}.
 

...

 

...

   

...

BroadcastStream

...

*
     * @return the coalesced global stream.
     */

...

   

...

 

...

GlobalStream<T> global();

    /**
     * 

...

Transform 

...

this 

...

stream 

...

to 

...

a 

...

new {@link KeyedPartitionStream}.
     *
     * @param keySelector 

...

to 

...

decide 

...

how to 

...

map 

...

data 

...

to 

...

partition.
     * 

...

@return the transformed stream partitioned by key.
     */
 

...

 

...

 

...

 <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);

    

...

/**

...


    

...

 * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
    

...

 * @return the transformed stream after shuffle.
     */
 

...

 

...

 

...

 NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link 

...

BroadcastStream}.
     *
     * 

...

@return the 

...

transformed {@link 

...

BroadcastStream}.
 

...

 

...

 

...

 

...

 */
    

...

BroadcastStream<T> broadcast();

    /**
     * 

...

Sink 

...

data 

...

from this 

...

stream.
     *

...


    

...

 

...

* 

...

@param 

...

sink to receive data from this stream.
     

...

*/
    void toSink(Sink<T> sink);

    /**
     * This 

...

class represents a combination of two {@link KeyedPartitionStream}. It will be used as
     * the return value of operation with two output.
     */
    interface TwoKeyedPartitionStreams<K, T1, T2> {
        /** Get the first stream. */
        KeyedPartitionStream<K, T1> getFirst();

        /** Get the second stream. */
        KeyedPartitionStream<K, T2> getSecond();
    }
}

GlobalStream

Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> {
    /**
     * Apply an operation to this {@link GlobalStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> GlobalStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link GlobalStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoGlobalStream<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link GlobalStream}.
     *
     * @param other {@link GlobalStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
            GlobalStream<T_OTHER> other,
            TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link GlobalStream}. It will be used as the
     * return value of operation with two output.
     */
    interface TwoGlobalStream<T1, T2> {
        /** Get the first stream. */
        GlobalStream<T1> getFirst();

        /** Get the second stream. */
        GlobalStream<T2> getSecond();
    }
}

BroadcastStream

Code Block
languagejava
titleBroadcastStream.java
/** This class represents a stream that each parallel task processes the same data. */
public interface BroadcastStream<T> {
    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

   /**
     * Apply a two input operation to this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

   /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the record from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from broadcast input, the output key from keyed partition itself instead of new key
     * selector, so it is safe already.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <K, T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector); 
}

Similarly to source, we only supports sinkV2 based sink.

The FLIP needs to move the following classes from flink-core into flink-core-api module:

Class Full Path

org.apache.flink.api.common.functions.Function

org.apache.flink.api.java.functions.KeySelector

Compatibility, Deprecation, and Migration Plan

The proposed new DataStream API and the old API are incompatible.
The deprecation and migration plan are discussed in the umbrella FLIP.

Test Plan

Comprehensive unit tests and integration tests will be added to ensure the correctness. In addition, some old API based jobs will be selected and rewritten for verification.

...

Similarly to source, we only supports sinkV2 based sink.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?