THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Process Function | number of inputs | number of outputs |
OneInputStreamProcessFunction | 1 | 1 |
TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction | 2 | 1 |
BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction | 2 | 1 |
TwoOutputStreamProcessFunction | 1 | 2 |
...
The case of two input is relatively special, and we have divided it into two categories:
- TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction: Neither of its inputs is a BroadcastStream, so processing only applied to the single partition.
- BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction: One of its inputs is the BroadcastStream, so the processing of this input is applied to all partitions. While the other side is Keyed/Non-Keyed Stream, it's processing applied to single partition.
...
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* This interface represents the context associated with all operations must be applied to all
* partitions.
*/
public interface NonPartitionedContext<OUT> extends RuntimeContext {
/**
* Apply a function to all partitions. For keyed stream, it will apply to all keys. For
* non-keyed stream, it will apply to single partition.
*/
void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}
/** A function to be applied to all partitions . */
@FunctionalInterface
public interface ApplyPartitionFunction<OUT> {
/**
* The actual method to be applied to each partition.
*
* @param collector to output data.
* @param ctx runtime context in which this function is executed.
*/
void apply(Collector<OUT> collector, RuntimeContext ctx) throws Exception;
}
/**
* This interface represents the context associated with all operations must be applied to all
* partitions with two outputs.
*/
public interface TwoOutputNonPartitionedContext<OUT1, OUT2> extends RuntimeContext {
/**
* Apply a function to all partitions. For keyed stream, it will apply to all keys. For
* non-keyed stream, it will apply to single partition.
*/
void applyToAllPartitions(TwoOutputNonPartitionedContext<OUT1, OUT2> applyPartitionFunction);
}
/** A function to be applied to all partitions with two outputs.. */
@FunctionalInterface
public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> {
/**
* The actual method to be applied to each partition.
*
* @param firstOutput to emit record to first output.
* @param secondOutput to emit record to second output.
* @param ctx runtime context in which this function is executed.
*/
void apply(Collector<OUT1> firstOutput, Collector<OUT2> secondOutput, RuntimeContext ctx)
throws Exception;
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** This class contains all logical related to process records from single input. */ public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction { /** * Process record and emit data through {@link Collector}. * * @param record to process. * @param output to emit processed records. * @param ctx, runtime context in which this function is executed. */ void processRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws Exception; /** * This is a life-cycle method indicates that this function will no longer receive any input * data. * * @param ctx, the context in which this function is executed. */ default void endInput(NonPartitionedContext ctx) {} } |
...
TwoInputNonBroadcastStreamProcessFunction
Code Block | ||||
---|---|---|---|---|
| ||||
/** This class contains all logical related to process records from two input. */ public interface TwoInputStreamProcessFunction<IN1TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction { /** * Process record from first input and emit data through {@link Collector}. * * @param record to process. * @param output to emit processed records. * @param ctx, runtime context in which this function is executed. */ void processRecordFromFirstInput(IN1 record, Collector<OUT> output, RuntimeContext ctx) throws Exception; /** * Process record from second input and emit data through {@link Collector}. * * @param record to process. * @param output to emit processed records. * @param ctx, runtime context in which this function is executed. */ void processRecordFromSecondInput(IN2 record, Collector<OUT> output, RuntimeContext ctx) throws Exception; /** * This is a life-cycle method indicates that this function will no longer receive any data from * the first input. * * @param ctx, the context in which this function is executed. */ default void endFirstInput(NonPartitionedContext ctx) {} /** * This is a life-cycle method indicates that this function will no longer receive any data from * the second input. * * @param ctx, the context in which this function is executed. */ default void endSecondInput(NonPartitionedContext ctx) {} } |
...
TwoInputBroadcastStreamProcessFunction
Code Block | ||||
---|---|---|---|---|
| ||||
/** * This class contains all logical related to process records from a broadcast stream and a * non-broadcast stream. */ public interface BroadcastTwoInputStreamProcessFunction<IN1TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction { /** * Process record from non-broadcast input and emit data through {@link Collector}. * * @param record to process. * @param output to emit processed records. * @param ctx, runtime context in which this function is executed. */ void processRecordFromNonBroadcastInput(IN1 record, Collector<OUT> output, RuntimeContext ctx) throws Exception; /** * Process record from broadcast input.In general, the broadcast side is not allowed to * manipulate state and output data because it corresponds to all partitions instead of a single * partition. But you could use broadcast context to process all the partitions at once. * * @param record to process. * @param ctx, the context in which this function is executed. */ void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) throws Exception; /** * This is a life-cycle method indicates that this function will no longer receive any data from * the non-broadcast input. * * @param ctx, the context in which this function is executed. */ default void endNonBroadcastInput(NonPartitionedContext ctx) {} /** * This is a life-cycle method indicates that this function will no longer receive any data from * the broadcast input. * * @param ctx, the context in which this function is executed. */ default void endBroadcastInput(NonPartitionedContext<OUT> ctx) {} } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * This class represents a kind of partitioned data stream. For this stream, each parallelism is a * partition, and the partition to which the data belongs is random. */ public interface NonKeyedPartitionStream<T> { /** * Apply an operation to this {@link NonKeyedPartitionStream}; * * @param processFunction to perform operation. * @return new stream with this operation. */ <OUT> NonKeyedPartitionStream<OUT> process( OneInputStreamProcessFunction<T, OUT> processFunction); /** * Apply a two output operation to this {@link NonKeyedPartitionStream}. * * @param processFunction to perform two output operation. * @return new stream with this operation. */ <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction); /** * Apply to a two input operation on this and other {@link NonKeyedPartitionStream}. * * @param other {@link NonKeyedPartitionStream} to perform operation with two input. * @param processFunction to perform operation. * @return new stream with this operation. */ <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess( NonKeyedPartitionStream<T_OTHER> other, TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction); /** * Apply a two input operation to this and other {@link BroadcastStream}. * * @param processFunction to perform operation. * @return new stream with this operation. */ <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess( BroadcastStream<T_OTHER> other, BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction); /** * Coalesce this stream to a {@link GlobalStream}. * * @return the coalesced global stream. */ GlobalStream<T> global(); /** * Transform this stream to a {@link KeyedPartitionStream}. * * @param keySelector to decide how to map data to partition. * @return the transformed stream partitioned by key. */ <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector); /** * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between * these two streams. * * @return the transformed stream after shuffle. */ NonKeyedPartitionStream<T> shuffle(); /** * Transform this stream to a new {@link BroadcastStream}. * * @return the transformed {@link BroadcastStream}. */ BroadcastStream<T> broadcast(); /** * Sink data from this stream. * * @param sink to receive data from this stream. */ void toSink(Sink<T> sink); /** * This class represents a combination of two {@link NonKeyedPartitionStream}. It will be used * as the return value of operation with two output. */ interface TwoNonKeyedPartitionStreams<T1, T2> { /** Get the first stream. */ NonKeyedPartitionStream<T1> getFirst(); /** Get the second stream. */ NonKeyedPartitionStream<T2> getSecond(); } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * This class represents a kind of partitioned data stream. For this stream, Each key group is a * partition, and the partition to which the data belongs is determined. */ public interface KeyedPartitionStream<K, T> { /** * Apply an operation to this {@link KeyedPartitionStream}. * * <p>This method is used to avoid shuffle after applying the process function. It is required * that for the same record, the new {@link KeySelector} must extract the same key as the * original {@link KeySelector} on this {@link KeyedPartitionStream}. * * @param processFunction to perform operation. * @param newKeySelector to select the key after process. * @return new {@link KeyedPartitionStream} with this operation. */ <OUT> KeyedPartitionStream<K, OUT> process( OneInputStreamProcessFunction<T, OUT> processFunction, KeySelector<OUT, K> newKeySelector); /** * Apply an operation to this {@link KeyedPartitionStream}; * * @param processFunction to perform operation. * @return new {@link NonKeyedPartitionStream} with this operation. */ <OUT> NonKeyedPartitionStream<OUT> process( OneInputStreamProcessFunction<T, OUT> processFunction); /** * Apply a two output operation to this {@link KeyedPartitionStream}. * * <p>This method is used to avoid shuffle after applying the process function. It is required * that for the same record, these new two {@link KeySelector}s must extract the same key as the * original {@link KeySelector}s on this {@link KeyedPartitionStream}. * * @param processFunction to perform two output operation. * @param keySelector1 to select the key of first output. * @param keySelector2 to select the key of second output. * @return new {@link TwoKeyedPartitionStream} with this operation. */ <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction, KeySelector<OUT1, K> keySelector1, KeySelector<OUT2, K> keySelector2); /** * Apply a two output operation to this {@link KeyedPartitionStream}. * * @param processFunction to perform two output operation. * @return new {@link TwoNonKeyedPartitionStream} with this operation. */ <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction); /** * Apply a two input operation to this and other {@link KeyedPartitionStream}. The two keyed * streams must have the same partitions, otherwise it makes no sense to connect them. * * @param other {@link KeyedPartitionStream} to perform operation with two input. * @param processFunction to perform operation. * @return new {@link NonKeyedPartitionStream} with this operation. */ <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess( KeyedPartitionStream<K, T_OTHER> other, TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction); /** * Apply a two input operation to this and other {@link KeyedPartitionStream}.The two keyed * streams must have the same partitions, otherwise it makes no sense to connect them. * * <p>This method is used to avoid shuffle after applying the process function. It is required * that for the same record, the new {@link KeySelector} must extract the same key as the * original {@link KeySelector}s on these two {@link KeyedPartitionStream}s. * * @param other {@link KeyedPartitionStream} to perform operation with two input. * @param processFunction to perform operation. * @param newKeySelector to select the key after process. * @return new {@link KeyedPartitionStream} with this operation. */ <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess( KeyedPartitionStream<K, T_OTHER> other, TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction, KeySelector<OUT, K> newKeySelector); /** * Apply a two input operation to this and other {@link BroadcastStream}. * * @param processFunction to perform operation. * @return new stream with this operation. */ <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess( BroadcastStream<T_OTHER> other, BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction); /** * Apply a two input operation to this and other {@link BroadcastStream}. * * <p>This method is used to avoid shuffle after applying the process function. It is required * that for the record from non-broadcast input, the new {@link KeySelector} must extract the * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the * record from broadcast input, the output key from keyed partition itself instead of new key * selector, so it is safe already. * * @param other {@link BroadcastStream} to perform operation with two input. * @param processFunction to perform operation. * @param newKeySelector to select the key after process. * @return new {@link KeyedPartitionStream} with this operation. */ <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess( BroadcastStream<T_OTHER> other, BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction, KeySelector<OUT, K> newKeySelector); /** * Coalesce this stream to a {@link GlobalStream}. * * @return the coalesced global stream. */ GlobalStream<T> global(); /** * Transform this stream to a new {@link KeyedPartitionStream}. * * @param keySelector to decide how to map data to partition. * @return the transformed stream partitioned by key. */ <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector); /** * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between * these two streams. * * @return the transformed stream after shuffle. */ NonKeyedPartitionStream<T> shuffle(); /** * Transform this stream to a new {@link BroadcastStream}. * * @return the transformed {@link BroadcastStream}. */ BroadcastStream<T> broadcast(); /** * Sink data from this stream. * * @param sink to receive data from this stream. */ void toSink(Sink<T> sink); /** * This class represents a combination of two {@link KeyedPartitionStream}. It will be used as * the return value of operation with two output. */ interface TwoKeyedPartitionStreams<K, T1, T2> { /** Get the first stream. */ KeyedPartitionStream<K, T1> getFirst(); /** Get the second stream. */ KeyedPartitionStream<K, T2> getSecond(); } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** This class represents a stream that force single parallelism. */ public interface GlobalStream<T> { /** * Apply an operation to this {@link GlobalStream}; * * @param processFunction to perform operation. * @return new stream with this operation. */ <OUT> GlobalStream<OUT> process( OneInputStreamProcessFunction<T, OUT> processFunction); /** * Apply a two output operation to this {@link GlobalStream}. * * @param processFunction to perform two output operation. * @return new stream with this operation. */ <OUT1, OUT2> TwoGlobalStream<OUT1, OUT2> process( TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction); /** * Apply a two input operation to this and other {@link GlobalStream}. * * @param other {@link GlobalStream} to perform operation with two input. * @param processFunction to perform operation. * @return new stream with this operation. */ <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess( GlobalStream<T_OTHER> other, TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction); /** * Transform this stream to a {@link KeyedPartitionStream}. * * @param keySelector to decide how to map data to partition. * @return the transformed stream partitioned by key. */ <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector); /** * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between * these two streams. * * @return the transformed stream after shuffle. */ NonKeyedPartitionStream<T> shuffle(); /** * Transform this stream to a new {@link BroadcastStream}. * * @return the transformed {@link BroadcastStream}. */ BroadcastStream<T> broadcast(); /** * Sink data from this stream. * * @param sink to receive data from this stream. */ void toSink(Sink<T> sink); /** * This class represents a combination of two {@link GlobalStream}. It will be used as the * return value of operation with two output. */ interface TwoGlobalStream<T1, T2> { /** Get the first stream. */ GlobalStream<T1> getFirst(); /** Get the second stream. */ GlobalStream<T2> getSecond(); } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** This class represents a stream that each parallel task processes the same data. */ public interface BroadcastStream<T> { /** * Apply a two input operation to this and other {@link KeyedPartitionStream}. * * @param other {@link KeyedPartitionStream} to perform operation with two input. * @param processFunction to perform operation. * @return new stream with this operation. */ <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess( KeyedPartitionStream<K, T_OTHER> other, BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction); /** * Apply a two input operation to this and other {@link NonKeyedPartitionStream}. * * @param other {@link NonKeyedPartitionStream} to perform operation with two input. * @param processFunction to perform operation. * @return new stream with this operation. */ <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess( NonKeyedPartitionStream<T_OTHER> other, BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction); /** * Apply a two input operation to this and other {@link KeyedPartitionStream}. * * <p>This method is used to avoid shuffle after applying the process function. It is required * that for the record from non-broadcast input, the new {@link KeySelector} must extract the * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the * record from broadcast input, the output key from keyed partition itself instead of new key * selector, so it is safe already. * * @param other {@link KeyedPartitionStream} to perform operation with two input. * @param processFunction to perform operation. * @param newKeySelector to select the key after process. * @return new {@link KeyedPartitionStream} with this operation. */ <K, T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess( KeyedPartitionStream<K, T_OTHER> other, BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction, KeySelector<OUT, K> newKeySelector); } |
...