Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleTwoInputStreamProcessFunction.javaBroadcastTwoInputStreamProcessFunction
/**
 * This class contains all logical related to process records from a broadcast stream and a
 * non-broadcast stream.
 */
public interface BroadcastTwoInputStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    /**
     * Process record from non-broadcast input and emit data through {@link Collector}.
     *
     * @param record to process.
     * @param output to emit processed records.
     * @param ctx, runtime context in which this function is executed.
     */
    void processRecordFromNonBroadcastInput(IN1 record, Collector<OUT> output, RuntimeContext ctx)
            throws Exception;

    /**
     * Process record from broadcast input.In general, the broadcast side is not allowed to
     * manipulate state and output data because it corresponds to all partitions instead of a single
     * partition. But you could use broadcast context to process all the partitions at once.
     *
     * @param record to process.
     * @param ctx, the context in which this function is executed.
     */
    void processRecordFromBroadcastInput(IN2 record, BroadcastContext<OUT> ctx) throws Exception;

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the non-broadcast input. This allowing the ProcessFunction to emit results at once rather
     * than upon each record.
     *
     * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void endNonBroadcastInput(Collector<OUT> output, RuntimeContext ctx) {}

    /**
     * This is a life-cycle method indicates that this function will no longer receive any data from
     * the broadcast input.
     *
     * @param ctx, the context in which this function is executed.
     */
    default void endBroadcastInput(BroadcastContext<OUT> ctx) {}
}


/** This interface represents the context associated with broadcasting. */
public interface BroadcastContext<OUT> {
    /**
     * Apply a function to all partitions. For keyed stream, it will apply to all keys. For
     * non-keyed stream, it will apply to single partition.
     */
    void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction);
}

/** A function to be applied to all partitions . */
@FunctionalInterface
public interface ApplyPartitionFunction<OUT> {
    /**
     * The actual method to be applied to each partition.
     *
     * @param collector to output data.
     * @param ctx runtime context in which this function is executed.
     */
    void apply(Collector<OUT> collector, RuntimeContext ctx) throws Exception;
}

...

Code Block
languagejava
titleNonKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, each parallelism is a
 * partition, and the partition to which the data belongs is random.
 */
public interface NonKeyedPartitionStream<T> {
    /**
     * Apply an operation to this {@link NonKeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link NonKeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply to a two input operation on this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            NonKeyedPartitionStream<T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoInputStreamProcessFunction<TBroadcastTwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Coalesce this stream to a {@link GlobalStream}.
     *
     * @return the coalesced global stream.
     */
    GlobalStream<T> global();

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
     * @return the transformed stream after shuffle.
     */
    NonKeyedPartitionStream<T> shuffle();

    /**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link NonKeyedPartitionStream}. It will be used
     * as the return value of operation with two output.
     */
    interface TwoNonKeyedPartitionStreams<T1, T2> {
        /** Get the first stream. */
        NonKeyedPartitionStream<T1> getFirst();

        /** Get the second stream. */
        NonKeyedPartitionStream<T2> getSecond();
    }
}

...

Code Block
languagejava
titleKeyedPartitionStream.java
/**
 * This class represents a kind of partitioned data stream. For this stream, Each key group is a
 * partition, and the partition to which the data belongs is determined.
 */
public interface KeyedPartitionStream<K, T> {
    /**
     * Apply an operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector} on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <OUT> KeyedPartitionStream<K, OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply an operation to this {@link KeyedPartitionStream};
     *
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <OUT> NonKeyedPartitionStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, these new two {@link KeySelector}s must extract the same key as the
     * original {@link KeySelector}s on this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @param keySelector1 to select the key of first output.
     * @param keySelector2 to select the key of second output.
     * @return new {@link TwoKeyedPartitionStream} with this operation.
     */
    <OUT1, OUT2> TwoKeyedPartitionStreams<K, OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction,
            KeySelector<OUT1, K> keySelector1,
            KeySelector<OUT2, K> keySelector2);

    /**
     * Apply a two output operation to this {@link KeyedPartitionStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new {@link TwoNonKeyedPartitionStream} with this operation.
     */
    <OUT1, OUT2> TwoNonKeyedPartitionStreams<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}. The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new {@link NonKeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

     /**
     * Apply a two input operation to this and other {@link KeyedPartitionStream}.The two keyed
     * streams must have the same partitions, otherwise it makes no sense to connect them.
     *
     * <p>This method is used to avoid shuffle after applying the process function. It is required
     * that for the same record, the new {@link KeySelector} must extract the same key as the
     * original {@link KeySelector}s on these two {@link KeyedPartitionStream}s.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link KeyedPartitionStream} with this operation.
     */
    <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
            KeyedPartitionStream<K, T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction,
            KeySelector<OUT, K> newKeySelector);

    /**
     * Apply a two input operation to this and other {@link BroadcastStream}.
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            BroadcastStream<T_OTHER> other,
            TwoInputStreamProcessFunction<TBroadcastTwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Apply Coalescea two thisinput streamoperation to athis and other {@link GlobalStreamBroadcastStream}.
     *
     * @return<p>This themethod coalescedis globalused stream.
to avoid shuffle after applying */
the process function. It GlobalStream<T> global();

is required
     /**
 that for the record *from Transform this stream to anon-broadcast input, the new {@link KeyedPartitionStreamKeySelector}.
 must extract   *the
     * @paramsame keySelectorkey toas decidethe howoriginal to map data to partition.{@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * @returnrecord from broadcast input, the transformedoutput streamkey partitionedfrom bykeyed key.
partition itself instead of new */key
     <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);

* selector, so it is safe already.
     /**
     * Transform this stream to a new@param other {@link NonKeyedPartitionStreamBroadcastStream}, to dataperform willoperation bewith shuffledtwo betweeninput.
     * these@param processFunction twoto streamsperform operation.
     * @param newKeySelector to select the key after process.
     * @return the transformed stream after shufflenew {@link KeyedPartitionStream} with this operation.
     */
    NonKeyedPartitionStream<T> shuffle();
<T_OTHER, OUT> ProcessConfigurableAndKeyedPartitionStream<K, OUT> connectAndProcess(
    /**
     * Transform this stream to a new {@link BroadcastStream}.
BroadcastStream<T_OTHER> other,
         *
   BroadcastTwoInputStreamProcessFunction<T, T_OTHER, *OUT> @returnprocessFunction,
 the transformed {@link BroadcastStream}.
     */
   KeySelector<OUT, BroadcastStream<T>K> broadcast(newKeySelector);

      /**
     * Coalesce Sinkthis stream datato froma this{@link streamGlobalStream}.
     *
     * @param@return sinkthe to receive data from this coalesced global stream.
     */
    voidGlobalStream<T> toSinkglobal(Sink<T> sink);

    /**
     * ThisTransform this classstream representsto a combination of twonew {@link KeyedPartitionStream}.
 It will be used as*
     * @param keySelector theto returndecide valuehow ofto operationmap withdata twoto outputpartition.
     */
 @return the transformed interfacestream TwoKeyedPartitionStreams<K,partitioned T1, T2> {by key.
     */
   /** Get the first stream. */
        KeyedPartitionStream<K, T1> getFirst();
 <NEW_KEY> KeyedPartitionStream<NEW_KEY, T> keyBy(KeySelector<T, NEW_KEY> keySelector);

    /**
     * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
     *
   /*  * Get@return the secondtransformed stream. */ after shuffle.
     */
   KeyedPartitionStream<K, T2>NonKeyedPartitionStream<T> getSecondshuffle();

    }
}

GlobalStream

/**
     * Transform this stream to a new {@link BroadcastStream}.
     *
     * @return the transformed {@link BroadcastStream}.
     */
    BroadcastStream<T> broadcast();

    /**
     * Sink data from this stream.
     *
     * @param sink to receive data from this stream.
     */
    void toSink(Sink<T> sink);

    /**
     * This class represents a combination of two {@link KeyedPartitionStream}. It will be used as
     * the return value of operation with two output.
     */
    interface TwoKeyedPartitionStreams<K, T1, T2> {
        /** Get the first stream. */
        KeyedPartitionStream<K, T1> getFirst();

        /** Get the second stream. */
        KeyedPartitionStream<K, T2> getSecond();
    }
}

GlobalStream

Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> {
    /**
     * Apply an operation to this {@link GlobalStream};
     *
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <OUT> GlobalStream<OUT> process(
            OneInputStreamProcessFunction<T, OUT> processFunction);

    /**
     * Apply a two output operation to this {@link GlobalStream}.
     *
     * @param processFunction to perform two output operation.
     * @return new stream with this operation.
     */
    <OUT1, OUT2> TwoGlobalStream<OUT1, OUT2> process(
            TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

    /**
     * Apply a two input operation to this and other {@link GlobalStream}.
     *
     * @param other {@link GlobalStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
    <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
            GlobalStream<T_OTHER> other,
            TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * Transform this stream to a {@link KeyedPartitionStream}.
     *
     * @param keySelector to decide how to map data to partition.
     * @return the transformed stream partitioned by key.
     */
    <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);
Code Block
languagejava
titleGlobalStream.java
/** This class represents a stream that force single parallelism. */
public interface GlobalStream<T> {
    /**
     * ApplyTransform anthis operationstream to thisa new {@link GlobalStreamNonKeyedPartitionStream};
, data will be shuffled *between
     * @param processFunction to perform operation. these two streams.
     *
     * @return newthe transformed stream withafter this operationshuffle.
     */
    <OUT>NonKeyedPartitionStream<T> GlobalStream<OUT> process(shuffle();

    /**
     * Transform this OneInputStreamProcessFunction<T,stream OUT> processFunction);

    /**to a new {@link BroadcastStream}.
     *
 Apply  a two output* operation@return tothe thistransformed {@link GlobalStreamBroadcastStream}.
     */
     * @param processFunction to perform two output operation.BroadcastStream<T> broadcast();

    /**
     * @returnSink newdata stream withfrom this operationstream.
     */
     * <OUT1,@param OUT2>sink TwoGlobalStream<OUT1,to OUT2>receive process(
data from this stream.
     */
    TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunctionvoid toSink(Sink<T> sink);

    /**
     * ApplyThis aclass tworepresents inputa operationcombination toof this andtwo other {@link GlobalStream}.
 It will be used as *the
     * @paramreturn other {@link GlobalStream} to performvalue of operation with two inputoutput.
     */
 @param processFunction to perform operation.
  interface TwoGlobalStream<T1, T2> {
   * @return new stream with this operation.
    /** Get the first stream. */
    <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
        GlobalStream<T1> getFirst();

        GlobalStream<T_OTHER> other,
    /** Get the second stream. */
        TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunctionGlobalStream<T2> getSecond();

    }
}

BroadcastStream

Code Block
languagejava
titleBroadcastStream.java
/**
 This class represents a stream *that Transformeach thisparallel streamtask toprocesses athe {@linksame KeyedPartitionStream}data. */
public interface BroadcastStream<T> {
    /**
     * @paramApply keySelectora totwo decideinput howoperation to this mapand dataother to{@link partitionKeyedPartitionStream}.
     *
 @return the transformed stream partitioned* by@param key.
other {@link KeyedPartitionStream} to perform */
operation with two input.
 <K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

    /*** @param processFunction to perform operation.
     * Transform@return thisnew stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
     * these two streams.
with this operation.
     */
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
        *
    KeyedPartitionStream<K, * @return the transformed stream after shuffle.
T_OTHER> other,
          */
  BroadcastTwoInputStreamProcessFunction<T, T_OTHER, NonKeyedPartitionStream<T>OUT> shuffle(processFunction);

     /**
     * Apply a Transformtwo thisinput streamoperation to this aand newother {@link BroadcastStreamNonKeyedPartitionStream}.
     *
     * @return@param theother transformed {@link BroadcastStream}NonKeyedPartitionStream} to perform operation with two input.
     */
 @param processFunction to BroadcastStream<T> broadcast();

perform operation.
     /*** @return new stream with this operation.
     */
 Sink data from this stream.   <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
     *
     * @param sink to receive data from this stream.
NonKeyedPartitionStream<T_OTHER> other,
          */
  BroadcastTwoInputStreamProcessFunction<T, T_OTHER, void toSink(Sink<T> sinkOUT> processFunction);

    /**
     * This class represents a combination of twoApply a two input operation to this and other {@link GlobalStreamKeyedPartitionStream}. It will be used as the
     * return value of operation with two output.
     */
 <p>This method is interfaceused TwoGlobalStream<T1, T2> {
        /** Getto avoid shuffle after applying the firstprocess streamfunction. */It is required
     * that for GlobalStream<T1> getFirst();

        /** Get the second stream. */
        GlobalStream<T2> getSecond();
    }
}

BroadcastStream

Code Block
languagejava
titleBroadcastStream.java
/** This class represents a stream that each parallel task processes the same data. */
public interface BroadcastStream<T> {
    /**the record from non-broadcast input, the new {@link KeySelector} must extract the
     * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. For the
     * record from broadcast input, the output key from keyed partition itself instead of new key
     * Applyselector, aso twoit inputis operation to this and other {@link KeyedPartitionStream}safe already.
     *
     * @param other {@link KeyedPartitionStream} to perform operation with two input.
     * @param processFunction to perform operation.
     * @param newKeySelector to select the key after process.
     * @return new {@link streamKeyedPartitionStream} with this operation.
     */
    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
            KeyedPartitionStream<KKeyedPartitionStream.ProcessConfigurableAndKeyedPartitionStream<K, T_OTHER> other,
     OUT>
       TwoInputStreamProcessFunction<T, T_OTHER, OUT> processFunction);

    /**
     * ApplyconnectAndProcess(
 a two input operation to this and other {@link NonKeyedPartitionStream}.
     *
     * @param other {@link NonKeyedPartitionStream} to perform operation with two input.KeyedPartitionStream<K, T_OTHER> other,
     * @param processFunction to perform operation.
     * @return new stream with this operation.
     */
  BroadcastTwoInputStreamProcessFunction<T,  <TT_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
processFunction,
              NonKeyedPartitionStream<T_OTHER> other,
            BroadcastTwoInputStreamProcessFunction<T, T_OTHERKeySelector<OUT, OUT>K> processFunctionnewKeySelector); 
}

Similarly to source, we only supports sinkV2 based sink.

...