...
Regarding the second point, process/ connectAndProcess and other operations performed on DataStreams will return an instance of corresponding interfaces. We will change the return type of one-input/two-output/two-input process and toSink methods as shown in the table below.
stream type | function | old return type | new return type |
NonKeyedPartitionStream | process(OneInputStreamProcessFunction) | NonKeyedPartitionStream | ProcessConfigurableAndNonKeyedPartitionStream |
process(TwoOutputStreamProcessFunction) | TwoNonKeyedPartitionStreams | ProcessConfigurableAndTwoNonKeyedPartitionStreams | |
connectAndProcess(NonKeyedPartitionStream, TwoInputStreamProcessFunction) | NonKeyedPartitionStream | ProcessConfigurableAndNonKeyedPartitionStream | |
connectAndProcess(BroadcastStream, | |||
TwoInputBroadcastStreamProcessFunction | NonKeyedPartitionStream | ProcessConfigurableAndNonKeyedPartitionStream | |
toSink(Sink) | void | ProcessConfigurable | |
KeyedPartitionStream | process(OneInputStreamProcessFunction, KeySelector) | KeyedPartitionStream | ProcessConfigurableAndKeyedPartitionStream |
process(OneInputStreamProcessFunction) | NonKeyedPartitionStream | ProcessConfigurableAndNonKeyedPartitionStream | |
process(TwoOutputStreamProcessFunction, KeySelector, KeySelector) | TwoKeyedPartitionStreams | ProcessConfigurableAndTwoKeyedPartitionStreams | |
process(TwoOutputStreamProcessFunction) | TwoNonKeyedPartitionStreams | ProcessConfigurableAndTwoNonKeyedPartitionStreams | |
connectAndProcess(KeyedPartitionStream, | |||
TwoInputNonBroadcastStreamProcessFunction); | NonKeyedPartitionStream | ProcessConfigurableAndNonKeyedPartitionStream | |
connectAndProcess( | |||
KeyedPartitionStream, | |||
TwoInputNonBroadcastStreamProcessFunction, | |||
KeySelector) | KeyedPartitionStream | ProcessConfigurableAndKeyedPartitionStream | |
connectAndProcess(BroadcastStream, | |||
TwoInputBroadcastStreamProcessFunction) | NonKeyedPartitionStream | ProcessConfigurableAndNonKeyedPartitionStream | |
connectAndProcess(BroadcastStream, TwoInputBroadcastStreamProcessFunction, KeySelector) | KeyedPartitionStream | ProcessConfigurableAndKeyedPartitionStream | |
toSink(Sink) | void | ProcessConfigurable | |
GlobalStream | process(OneInputStreamProcessFunction) | GlobalStream | ProcessConfigurableAndGlobalStream |
process(TwoOutputStreamProcessFunction) | TwoGlobalStreams | ProcessConfigurableAndTwoGlobalStreams | |
connectAndProcess(GlobalStream, | |||
TwoInputNonBroadcastStreamProcessFunction) | GlobalStream | ProcessConfigurableAndGlobalStream | |
toSink(Sink) | void | ProcessConfigurable | |
BroadcastStream | connectAndProcess(KeyedPartitionStream, | ||
TwoInputBroadcastStreamProcessFunction) | NonKeyedPartitionStream | ProcessConfigurableAndNonKeyedPartitionStream | |
connectAndProcess(NonKeyedPartitionStream, | |||
TwoInputBroadcastStreamProcessFunction) | NonKeyedPartitionStream | ProcessConfigurableAndNonKeyedPartitionStream | |
connectAndProcess(KeyedPartitionStream, TwoInputBroadcastStreamProcessFunction, KeySelector) | KeyedPartitionStream | ProcessConfigurableAndKeyedPartitionStream |
Move SlotSharingGroup to flink-core-api module
...
JobInfo: Hold all job related information, such as job name, execution mode.
TaskInfo: Hold all task related information, such as parallelism.
State Manager: Manage context related to state, such as accessing a specific state.
Watermark Manager: Manage context related to generalized watermark, such as triggering a watermark.
ProcessingTime Manager: Manage context related to processing timer, such as the current processing time.
- MetricManager: Manage context related to metrics, such as get the metric group of this processing.
- TimestampManager: Manager context related to timestamp of record which processed by this function.
...
The logic of processing watermark is omitted, but we will introduce it in detail in other sub-FLIPs later.
...
TimestampManager
Code Block | ||||
---|---|---|---|---|
| ||||
/** This is responsibility for managing metricsRetrieving timestamp related things of the process function. */ public interface MetricManagerTimestampManager { /** Get * Returns the metrictimestamp groupof forcurrent this function. * * @return The metric group for this function. processing record. */ OperatorMetricGrouplong getMetricGroupgetCurrentRecordTimestamp(); } |
...
RuntimeContext
Code Block | ||||
---|---|---|---|---|
| ||||
/** This* A isRuntimeContext responsibilitycontains forinformation Retrievingabout timestampthe relatedcontext thingsin ofwhich process functionfunctions are executed. */ public interface TimestampManager { /** Get the timestamp of current processing record. */ long getCurrentRecordTimestamp(); } |
RuntimeContext
Code Block | ||||
---|---|---|---|---|
| ||||
/** * A RuntimeContext contains information about the context in which process functions are executed. * Each parallel instance of the function will have a context through which it can access contextual * information, such as the current key and execution mode. Through this context, we can also * interact with the execution layer, such as getting state, emitting watermark, registering timer, etc. */ public interface RuntimeContext { Each parallel instance of the function will have a context through which it can access contextual * information, such as the current key and execution mode. Through this context, we can also * interact with the execution layer, such as getting state, emitting watermark, registering timer, etc. */ public interface RuntimeContext { /** Get the {@link JobInfo} of this process function. */ JobInfo getJobInfo(); /** Get the {@link JobInfoTaskInfo} of this process function. */ JobInfoTaskInfo getJobInfogetTaskInfo(); /** Get the {@link TaskInfoProcessingTimeManager} of this process function. */ TaskInfoProcessingTimeManager getTaskInfogetProcessingTimeManager(); /** Get the {@link ProcessingTimeManagerStateManager} of this process function. */ ProcessingTimeManagerStateManager getProcessingTimeManagergetStateManager(); /** Get the {@link StateManagerWatermarkManager} of this process function. */ StateManagerWatermarkManager getStateManagergetWatermarkManager(); /** Get the {@linkmetric WatermarkManager}group of this process function. */ WatermarkManagerOperatorMetricGroup getWatermarkManagergetMetricGroup(); /** Get the {@link MetricManager} of this process function. */ MetricManager getMetricManager(); / /** Get the {@link TimestampManager} of this process function. */ TimestampManager getTimestampManager(); } |
ProcessFunction
Only partition independent methods are supported for its subclass NonPartitionedContext, they include getJobInfo, getTaskInfo, getWatermarkManager, getMetricGroup, getTimestampManager. But partition-related operations such as registering and deleting processing timers and getting state are not supported since the current partition cannot be decided.
ProcessFunction
At the same time, At the same time, we should expand the implementation of Process Function. In more detail, we will introduce a new method called onProcessingTimer to all type of process functions. It is the callback for the timer registered through RuntimeContext#ProcessingTimeManager.
Note: The logic for processing watermark is omitted, but we will elaborate on it later in the future sub-FLIPs.
...
on it later in the future sub-FLIPs.
OneInputStreamProcessFunction
Code Block | ||||
---|---|---|---|---|
| ||||
/** This class contains all logical related to process records from single input. */
public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
// Omit logic of processing data and life-cycle methods. They can be found in FLIP-409.
/**
* Callback for processing timer.
*
* @param timestamp when this callback is triggered.
* @param output to emit record.
* @param ctx, runtime context in which this function is executed.
*/
default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
} |
TwoInputNonBroadcastStreamProcessFunction
Code Block | ||||
---|---|---|---|---|
| ||||
/** This class contains all logical related to process records from two singlenon-broadcast input. */ public interface TwoInputNonBroadcastStreamProcessFunction<IN1, OneInputStreamProcessFunction<ININ2, OUT> extends ProcessFunction { // Omit logic of processing data and life-cycle methods. They can be found in FLIP-1409. /** * Callback for processing timer. * * @param timestamp when this callback is triggered. * @param output to emit record. * @param ctx, runtime context in which this function is executed. */ default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {} } |
...
TwoInputBroadcastStreamProcessFunction
Code Block | ||||
---|---|---|---|---|
| ||||
/** * This class contains all logical related to process records from two input. a broadcast stream and a * non-broadcast stream. */ public interface TwoInputStreamProcessFunction<IN1TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction { // Omit logic of processing data and life-cycle methods. They can be found in FLIP-1409. /** * Callback for processing timer. * * @param timestamp when this callback is triggered. * @param output to emit record. * @param ctx, runtime context in which this function is executed. */ default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {} } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** This class contains all logical related to process and emit records to two outputs. */ public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction { // Omit logic of processing data and life-cycle methods. They can be found in FLIP-1409. /** * Callback for processing timer. * * @param timestamp when this callback is triggered. * @param output1 to emit record. * @param output2 to emit record. * @param ctx, runtime context in which this function is executed. */ default void onProcessingTimer( long timestamp, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx) {} } |
...