Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Process Function

number of inputs

number of outputs

OneInputStreamProcessFunction

1

1

TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction

2

1

BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction

2

1

TwoOutputStreamProcessFunction

1

2

...

The case of two input is relatively special, and we have divided it into two categories:

  1. TwoInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction: Neither of its inputs is a BroadcastStream, so processing only applied to the single partition.
  2. BroadcastTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction: One of its inputs is the BroadcastStream, so the processing of this input is applied to all partitions. While the other side is Keyed/Non-Keyed Stream, it's processing applied to single partition.

...

Code Block
languagejava
titleCollector.java
/**
 * This interface represents the context associated with all operations must be applied to all
 * partitions.
 */ 
public interface NonPartitionedContext<OUT> extends RuntimeContext {
    /**
     * Apply a function to all partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
    void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}

/** A function to be applied to all partitions . */
@FunctionalInterface
public interface ApplyPartitionFunction<OUT> {
    /**
     * The actual method to be applied to each partition.
     *
     * @param collector to output data.
     * @param ctx runtime context in which this function is executed.
     */
    void apply(Collector<OUT> collector, RuntimeContext ctx) throws Exception;
}

/**
 * This interface represents the context associated with all operations must be applied to all
 * partitions with two outputs.
 */
public interface TwoOutputNonPartitionedContext<OUT1, OUT2> extends RuntimeContext {
    /**
     * Apply a function to all partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
    void applyToAllPartitions(TwoOutputNonPartitionedContext<OUT1, OUT2> applyPartitionFunction);
}

/** A function to be applied to all partitions with two outputs.. */
@FunctionalInterface
public interface TwoOutputApplyPartitionFunction<OUT1, OUT2> {
    /**
     * The actual method to be applied to each partition.
     *
     * @param firstOutput to emit record to first output.
     * @param secondOutput to emit record to second output.
     * @param ctx runtime context in which this function is executed.
     */
    void apply(Collector<OUT1> firstOutput, Collector<OUT2> secondOutput, RuntimeContext ctx)
            throws Exception;
}

...

Code Block
languagejava
titleOneInputStreamProcessFunction.java
/** This class contains all logical related to process records from single input. */
public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
    /**
     * Process record and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecord(IN record, Collector<OUT> output, RuntimeContext ctx) throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any input
     * data.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endInput(NonPartitionedContext ctx) {}
}

...

TwoInputNonBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoInputStreamProcessFunction.java
/** This class contains all logical related to process records from two input. */
public interface TwoInputStreamProcessFunction<IN1TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * Process record from first input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromFirstInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * Process record from second input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromSecondInput(IN2 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the first input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endFirstInput(NonPartitionedContext ctx) {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the second input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endSecondInput(NonPartitionedContext ctx) {}
}

...

TwoInputBroadcastStreamProcessFunction

Code Block
languagejava
titleBroadcastTwoInputStreamProcessFunction
/**
 * This class contains all logical related to process records from a broadcast stream and a
 * non-broadcast stream.
 */
public interface BroadcastTwoInputStreamProcessFunction<IN1TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * Process record from non-broadcast input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromNonBroadcastInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * Process record from broadcast input.In general, the broadcast side is not allowed to
     * manipulate state and output data because it corresponds to all partitions instead of a single
     * partition. But you could use broadcast context to process all the partitions at once.
     *
     * @param record to process.
     * @param ctx, the context in which this function is executed.
     */
    void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx) throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the non-broadcast input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endNonBroadcastInput(NonPartitionedContext ctx) {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the broadcast input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endBroadcastInput(NonPartitionedContext<OUT> ctx) {}
}

...

Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, each parallelism is a
 * partition, and the partition to which the data belongs is random.
 */
public interface NonKeyedPartitionStream<T> {
    /**
     * Apply an operation to this {@link NonKeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link NonKeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply to a two input operation on this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link NonKeyedPartitionStream}. It will be used
     * as the return value of operation with two output.
     */
    interface TwoNonKeyedPartitionStreams<T1, T2> {
        /** Get the first stream. */
        NonKeyedPartitionStream<T1> getFirst();

        /** Get the second stream. */
        NonKeyedPartitionStream<T2> getSecond();
    }
}

...

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, Each key group is a
 * partition, and the partition to which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> {
    /**
     * Apply an operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector} on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <OUT> KeyedPartitionStream<K, OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply an operation to this {@link KeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, these new two {@link KeySelector}s must extract the same key as the
     * original {@link KeySelector}s on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @param keySelector1 to select the key of first output.
     * @param keySelector2 to select the key of second output.
     * @return new {@link TwoKeyedPartitionStream} with this operation.
     */
    <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction,
            KeySelector<OUT1, K> keySelector1,
            KeySelector<OUT2, K> keySelector2);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new {@link TwoNonKeyedPartitionStream} with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}. The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector}s on these two {@link KeyedPartitionStream}s.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the record from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from broadcast input, the output key from keyed partition itself instead of new key
     * selector, so it is safe already.
     *
     * @param other {@link BroadcastStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

    /**
     * Transform this stream to a new {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link KeyedPartitionStream}. It will be used as
     * the return value of operation with two output.
     */
    interface TwoKeyedPartitionStreams<K, T1, T2> {
        /** Get the first stream. */
        KeyedPartitionStream<K, T1> getFirst();

        /** Get the second stream. */
        KeyedPartitionStream<K, T2> getSecond();
    }
}

...

Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> {
    /**
     * Apply an operation to this {@link GlobalStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> GlobalStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link GlobalStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoGlobalStream<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link GlobalStream}.
     *
     * @param other {@link GlobalStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
            GlobalStream<T_OTHER> other,
            TwoInputStreamProcessFunction<TTwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link GlobalStream}. It will be used as the
     * return value of operation with two output.
     */
    interface TwoGlobalStream<T1, T2> {
        /** Get the first stream. */
        GlobalStream<T1> getFirst();

        /** Get the second stream. */
        GlobalStream<T2> getSecond();
    }
}

...

Code Block
languagejava
titleBroadcastStream.java
/** This class represents a stream that each parallel task processes the same data. */
public interface BroadcastStream<T> {
    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

   /**
     * Apply a two input operation to this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

   /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the record from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from broadcast input, the output key from keyed partition itself instead of new key
     * selector, so it is safe already.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <K, T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<TTwoInputBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector); 
}

...