Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Regarding the second point, processconnectAndProcess and other operations performed on DataStreams will return an instance of corresponding interfaces. We will change the return type of one-input/two-output/two-input process and toSink methods as shown in the table below.

 TwoInputStreamProcessFunction TwoInputStreamProcessFunction KeyedPartitionStream TwoInputStreamProcessFunction KeySelector TwoInputStreamProcessFunction TwoInputStreamProcessFunction TwoInputStreamProcessFunction TwoInputStreamProcessFunction

stream type

function

old return type

new return type

NonKeyedPartitionStream

process(OneInputStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

process(TwoOutputStreamProcessFunction)

TwoNonKeyedPartitionStreams

ProcessConfigurableAndTwoNonKeyedPartitionStreams

connectAndProcess(NonKeyedPartitionStream, TwoInputStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

connectAndProcess(BroadcastStream,

TwoInputBroadcastStreamProcessFunction

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

toSink(Sink)

void

ProcessConfigurable

KeyedPartitionStream

process(OneInputStreamProcessFunction, KeySelector)

KeyedPartitionStream

ProcessConfigurableAndKeyedPartitionStream

process(OneInputStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

process(TwoOutputStreamProcessFunction, KeySelector, KeySelector)

TwoKeyedPartitionStreams

ProcessConfigurableAndTwoKeyedPartitionStreams

process(TwoOutputStreamProcessFunction)

TwoNonKeyedPartitionStreams

ProcessConfigurableAndTwoNonKeyedPartitionStreams

connectAndProcess(KeyedPartitionStream,

TwoInputNonBroadcastStreamProcessFunction);

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

connectAndProcess(

KeyedPartitionStream,

TwoInputNonBroadcastStreamProcessFunction,

KeySelector)

KeyedPartitionStream

ProcessConfigurableAndKeyedPartitionStream

connectAndProcess(BroadcastStream,

TwoInputBroadcastStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

connectAndProcess(BroadcastStream, TwoInputBroadcastStreamProcessFunction, KeySelector)

KeyedPartitionStream

ProcessConfigurableAndKeyedPartitionStream

toSink(Sink)

void

ProcessConfigurable

GlobalStream

process(OneInputStreamProcessFunction)

GlobalStream

ProcessConfigurableAndGlobalStream

process(TwoOutputStreamProcessFunction)

TwoGlobalStreams

ProcessConfigurableAndTwoGlobalStreams

connectAndProcess(GlobalStream,

TwoInputNonBroadcastStreamProcessFunction)

GlobalStream

ProcessConfigurableAndGlobalStream

toSink(Sink)

void

ProcessConfigurable

BroadcastStream



connectAndProcess(KeyedPartitionStream,

TwoInputBroadcastStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

connectAndProcess(NonKeyedPartitionStream,

TwoInputBroadcastStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

connectAndProcess(KeyedPartitionStream, TwoInputBroadcastStreamProcessFunction, KeySelector)

KeyedPartitionStream

ProcessConfigurableAndKeyedPartitionStream

...

  • JobInfo: Hold all job related information, such as job name, execution mode.

  • TaskInfo: Hold all task related information, such as parallelism.

  • State Manager: Manage context related to state, such as accessing a specific state.

  • Watermark Manager: Manage context related to generalized watermark, such as triggering a watermark.

  • ProcessingTime Manager: Manage context related to processing timer, such as the current processing time.

  • TimestampManager: Manager context related to timestamp of record which processed by this function.

Their definitions are as follows:

...

Code Block
languagejava
titleProcessingTimeManager.java
/**
 * This is responsibility for managing runtime information related to processing time of process
 * function.
 */
public interface ProcessingTimeManager {
    /**
     * Register a processing timer for this process function. onProcessingTimer method of this
     * function will be invoked as callback if the timer expires.
     *
     * @param timestamp to trigger timer callback.
     */
    void registerProcessingTimer(long timestamp);

    /**
     * GetDeletes the current processing -time.
 timer with the  *given trigger timestamp. This method has only an
     * @return current processing time effect if such a timer was previously registered and did not already expire.
     */
     * @param timestamp indicates long currentProcessingTime();
}

It should be noted that we can only register processing timer here. This is because we no longer unify event time and processing time into a single time service. Simply put, event Time can be implemented using the generalized watermark mechanism, and more detailed reasons have been explained in the Umbrella FLIP.

...

the timestamp of the timer to delete.
     */
    void deleteProcessingTimeTimer(long timestamp);

    /**
     * Get the current processing time.
     *
     * @return current processing time.
     */
    long currentProcessingTime();
}

It should be noted that we can only register processing timer here. This is because we no longer unify event time and processing time into a single time service. Simply put, event Time can be implemented using the generalized watermark mechanism, and more detailed reasons have been explained in the Umbrella FLIP.

StateManager

Code Block
languagejava
titleStateManager.java
/** This is responsibility for managing runtime information related to state of process function. */
public interface StateManager {
    /**
     * Get the key of current record.
     *
     * @return The key of current processed record for {@link KeyedPartitionStream}. {@link
     *     Optional#empty()} for other non-keyed stream.
     */
    <K> Optional<K> getCurrentKey();

   // The logic of getting states is omitted in this FLIP.
    
}

The logic of getting states is omitted, but we will introduce it in detail in other sub-FLIPs later.

WatermarkManager

Code Block
languagejava
titleWatermarkManager.java
/**
 * This is responsibility for managing runtime information related to watermark of process function.
 */
public interface WatermarkManager {
     // The logic of processing watermark is omitted in this FLIP.
}

The logic of processing watermark is omitted, but we will introduce it in detail in other sub-FLIPs later.

TimestampManager

Code Block
languagejava
titleTimestampManager.java
/** This is responsibility for Retrieving timestamp related things of process function. */
public interface TimestampManager {
    /** Get the timestamp of current processing record. */
    long getCurrentRecordTimestamp();
}

RuntimeContext

Code Block
languagejava
titleStateManagerRuntimeContext.java
/**
 * A RuntimeContext contains information about the context in which process functions are executed.
 * This is responsibility for managing runtime information related to state of process function. */
public interface StateManager {
    /**
     * Get the key of current record.
     *
     * @return The key of current processed record for {@link KeyedPartitionStream}. {@link
     *     Optional#empty()} for other non-keyed stream.
     Each parallel instance of the function will have a context through which it can access contextual
 * information, such as the current key and execution mode. Through this context, we can also
 * interact with the execution layer, such as getting state, emitting watermark, registering timer, etc.
*/
public interface RuntimeContext {
    /** Get the {@link JobInfo} of this process function. */
    JobInfo getJobInfo();

    /** Get the {@link TaskInfo} of this process function. */
    <K>TaskInfo Optional<K> getCurrentKeygetTaskInfo();

    //** TheGet logicthe of{@link gettingProcessingTimeManager} states is omitted in this FLIP.
    
}

The logic of getting states is omitted, but we will introduce it in detail in other sub-FLIPs later.

WatermarkManager

Code Block
languagejava
titleWatermarkManager.java
/**
 * This is responsibility for managing runtime information related to watermark of process function.
 */
public interface WatermarkManager {
     // The logic of processing watermark is omitted in this FLIP.
}

The logic of processing watermark is omitted, but we will introduce it in detail in other sub-FLIPs later.

RuntimeContext

of this process function. */
    ProcessingTimeManager getProcessingTimeManager();

    /** Get the {@link StateManager} of this process function. */
    StateManager getStateManager();

    /** Get the {@link WatermarkManager} of this process function. */
    WatermarkManager getWatermarkManager();    
  
    /** Get the metric group of this process function. */
    OperatorMetricGroup getMetricGroup();

    /** Get the {@link TimestampManager} of this process function. */
    TimestampManager getTimestampManager();
}

Only partition independent methods are supported for its subclass NonPartitionedContext, they include getJobInfo, getTaskInfo, getWatermarkManager, getMetricGroup, getTimestampManager. But partition-related operations such as registering and deleting processing timers and getting state are not supported since the current partition cannot be decided.

ProcessFunction

At the same time, we should expand the implementation of Process Function. In more detail, we will introduce a new method called onProcessingTimer to all type of process functions. It is the callback for the timer registered through RuntimeContext#ProcessingTimeManager.
Note: The logic for processing watermark is omitted, but we will elaborate on it later in the future sub-FLIPs.

OneInputStreamProcessFunction

Code Block
languagejava
titleOneInputStreamProcessFunction.java
/** This class contains all logical related to process records from single input. */
public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-409.
Code Block
languagejava
titleRuntimeContext.java
/**
 * A RuntimeContext contains information about the context in which process functions are executed.
 * Each parallel instance of the function will have a context through which it can access contextual
 * information, such as the current key and execution mode. Through this context, we can also
 * interact with the execution layer, such as getting state, emitting watermark, registering timer, etc.
*/
public interface RuntimeContext {
    /** Get the {@link JobInfo} of this process function. */
    JobInfo getJobInfo();

    /** Get the {@link TaskInfo} of this process function. */
    TaskInfo getTaskInfo();

    /**
 Get  the {@link ProcessingTimeManager}* ofCallback thisfor processprocessing functiontimer. */
    ProcessingTimeManager getProcessingTimeManager();

*
     /** Get@param the {@link StateManager} oftimestamp when this processcallback function. */
    StateManager getStateManager();

is triggered.
     /** @param Getoutput theto {@link WatermarkManager} of this process function. */
    WatermarkManager getWatermarkManager();
}

ProcessFunction

At the same time, we should expand the implementation of Process Function. In more detail, we will introduce a new method called onProcessingTimer to all type of process functions. It is the callback for the timer registered through RuntimeContext#ProcessingTimeManager.
Note: The logic for processing watermark is omitted, but we will elaborate on it later in the future sub-FLIPs.

...

emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}

TwoInputNonBroadcastStreamProcessFunction

Code Block
languagejava
titleOneInputStreamProcessFunctionTwoInputNonBroadcastStreamProcessFunction.java
/** This class contains all logical related to process records from singletwo non-broadcast input. */
public interface TwoInputNonBroadcastStreamProcessFunction<IN1, OneInputStreamProcessFunction<ININ2, OUT> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-1409.

    /**
     * Callback for processing timer.
     *
     * @param timestamp when this callback is triggered.
     * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}

...

TwoInputBroadcastStreamProcessFunction

Code Block
languagejava
titleTwoInputStreamProcessFunctionTwoInputBroadcastStreamProcessFunction.java
/**
 * This class contains all logical related to process records from two input. a broadcast stream and a
 * non-broadcast stream.
 */
public interface TwoInputStreamProcessFunction<IN1TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-1409.

    /**
     * Callback for processing timer.
     *
     * @param timestamp when this callback is triggered.
     * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}

...

Code Block
languagejava
titleTwoOutputStreamProcessFunction.java
/** This class contains all logical related to process and emit records to two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-1409.

    /**
     * Callback for processing timer.
     *
     * @param timestamp when this callback is triggered.
     * @param output1 to emit record.
     * @param output2 to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(
            long timestamp, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx) {}
}

...