Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • JobInfo: Hold all job related information, such as job name, execution mode.

  • TaskInfo: Hold all task related information, such as parallelism.

  • State Manager: Manage context related to state, such as accessing a specific state.

  • Watermark Manager: Manage context related to generalized watermark, such as triggering a watermark.

  • ProcessingTime Manager: Manage context related to processing timer, such as the current processing time.

  • MetricManager: Manage context related to metrics, such as get the metric group of this processing.
  • TimestampManager: Manager context related to timestamp of record which processed by this function.

...

The logic of processing watermark is omitted, but we will introduce it in detail in other sub-FLIPs later.

...

TimestampManager

Code Block
languagejava
titleMetricManagerTimestampManager.java
/** This is responsibility for managing metricsRetrieving timestamp related things of the process function. */
public interface MetricManagerTimestampManager {
    /**
     * Returns Get the metrictimestamp groupof forcurrent thisprocessing functionrecord.
     */
     * @return The metric group for this function.
     */
    OperatorMetricGroup getMetricGroup();
}

...

long getCurrentRecordTimestamp();
}

RuntimeContext

Code Block
languagejava
titleTimestampManagerRuntimeContext.java
/**
 This* A isRuntimeContext responsibilitycontains forinformation Retrievingabout timestampthe relatedcontext thingsin ofwhich process functionfunctions are executed.
 */
public interfaceEach TimestampManagerparallel {
    /** Get the timestamp of current processing record. */
    long getCurrentRecordTimestamp();
}

RuntimeContext

Code Block
languagejava
titleRuntimeContext.java
/**
 * A RuntimeContext contains information about the context in which process functions are executed.
 * Each parallel instance of the function will have a context through which it can access contextual
 * information, such as the current instance of the function will have a context through which it can access contextual
 * information, such as the current key and execution mode. Through this context, we can also
 * interact with the execution layer, such as getting state, emitting watermark, registering timer, etc.
*/
public interface RuntimeContext {
    /** Get the {@link JobInfo} of this process function. */
    JobInfo getJobInfo();

    /** Get the {@link TaskInfo} of this process function. */
    TaskInfo getTaskInfo();

    /** Get the {@link ProcessingTimeManager} of this process function. */
    ProcessingTimeManager getProcessingTimeManager();

    /** Get the {@link StateManager} of this process function. */
    StateManager getStateManager();

    /** Get the {@link WatermarkManager} of this process function. */
    WatermarkManager getWatermarkManager();

    
  
    /** Get the {@linkmetric MetricManager}group of this process function. */
    MetricManagerOperatorMetricGroup getMetricManagergetMetricGroup();

      /** Get the {@link TimestampManager} of this process function. */
    TimestampManager getTimestampManager();
}

...

At the same time, we should expand the implementation of Process Function. In more detail, we will introduce a new method called onProcessingTimer to all type of process functions. It is the callback for the timer registered through RuntimeContext#ProcessingTimeManager.
Note: The logic for processing watermark is omitted, but we will elaborate on it later in the future sub-FLIPs.

...

it later in the future sub-FLIPs.

OneInputStreamProcessFunction

Code Block
languagejava
titleOneInputStreamProcessFunction.java
/** This class contains all logical related to process records from single input. */
public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-1.

    /**
     * Callback for processing timer.
     *
     * @param timestamp when this callback is triggered.
     * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}

TwoInputStreamProcessFunction

Code Block
languagejava
titleOneInputStreamProcessFunctionTwoInputStreamProcessFunction.java
/** This class contains all logical related to process records from singletwo input. */
public interface OneInputStreamProcessFunction<INTwoInputStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-1.

    /**
     * Callback for processing timer.
     *
     * @param timestamp when this callback is triggered.
     * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}

...

BroadcastTwoInputStreamProcessFunction

Code Block
languagejava
titleTwoInputStreamProcessFunctionBroadcastTwoInputStreamProcessFunction.java
/** This class contains all logical related to process records from two input. */
public interface TwoInputStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-1.

    /**
     * Callback for processing timer.
     *
     * @param timestamp when this callback is triggered.
     * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}

...