Introduction

In FLIP-409: we have defined the most basic primitive of DataStream V2. On this basis, this FLIP will further answer several important questions closely related to it:

  1. How to configure the processing over the datastreams, such as setting the parallelism.
  2. How to get access to the runtime contextual information and services from inside the process functions.
  3. How to work with processing-time timers.

Proposed Changes

Configuration

After defining the process functions, user may want to make some configurations for the properties of this processing. For example, set the parallelism and name of the process operation, etc. 

Therefore, we hope that the return value of process/connectAndProcess meets the following two requirements at the same time:

  1. It should be a handle, allowing us to configure the previous processing.

  2. It should be a new DataStream, allowing us to do further processing on it.

The advantage of this is that configurations can be made more conveniently by continuously using `withXXX` . For example:

inputStream
  .process(func1) // do process 1
  .withConfigFoo(foo) // configure Foo for process 1
  .withConfigBar(bar) //  configure Bar for process 1
  .process(func2) //  do further process 2

To meet the requirement 1, we provide a unified interface to set these configuration options.

Note: There may be some other configuration items that will be gradually added as development proceeds.

ProcessConfigurable.java
/**
 * This represents the configuration handle of processing. Many processing-related properties can be
 * set through the withXXX method provided by this interface, such as withName, withUid, etc.
 */
public interface ProcessConfigurable<T extends ProcessConfigurable<T>> {
    /**
     * Sets an ID for this operator.
     *
     * <p>The specified ID is used to assign the same operator ID across job submissions (for
     * example when starting a job from a savepoint).
     *
     * <p><strong>Important</strong>: this ID needs to be unique per transformation and job.
     * Otherwise, job submission will fail.
     *
     * @param uid The unique user-specified ID of this transformation.
     * @return The operator with the specified ID.
     */
    T withUid(String uid);

    /**
     * Sets the name of the current data stream. This name is used by the visualization and logging
     * during runtime.
     *
     * @return The named operator.
     */
    T withName(String name);

    /**
     * Sets the parallelism for this operator.
     *
     * @param parallelism The parallelism for this operator.
     * @return The operator with set parallelism.
     */
    T withParallelism(int parallelism);

    /**
     * Sets the maximum parallelism of this operator.
     *
     * <p>The maximum parallelism specifies the upper bound for dynamic scaling. It also defines the
     * number of key groups used for partitioned state.
     *
     * @param maxParallelism Maximum parallelism
     * @return The operator with set maximum parallelism
     */
    T withMaxParallelism(int maxParallelism);

    /**
     * Sets the slot sharing group of this operation. Parallel instances of operations that are in
     * the same slot sharing group will be co-located in the same TaskManager slot, if possible.
     *
     * <p>Operations inherit the slot sharing group of input operations if all input operations are
     * in the same slot sharing group and no slot sharing group was explicitly specified.
     *
     * <p>Initially an operation is in the default slot sharing group. An operation can be put into
     * the default group explicitly by setting the slot sharing group with name {@code "default"}.
     *
     * @param slotSharingGroup Which contains name and its resource spec.
     */
    T withSlotSharingGroup(SlotSharingGroup slotSharingGroup);
}

At the same time, the following new interfaces are introduced, they connect specific stream and the processConfigurable interface.

/** This interface represents a configurable {@link NonKeyedPartitionStream}. */
interface ProcessConfigurableAndNonKeyedPartitionStream<T> extends NonKeyedPartitionStream<T>,
ProcessConfigurable<ProcessConfigurableAndNonKeyedPartitionStream<T>> {}

/** This interface represents a configurable {@link KeyedPartitionStream}. */
interface ProcessConfigurableAndKeyedPartitionStream<K, T> extends KeyedPartitionStream<K,T>,
ProcessConfigurable<ProcessConfigurableAndKeyedPartitionStream<K, T>> {}

/** This interface represents a configurable {@link GlobalStream}. */
interface ProcessConfigurableAndGlobalStream<T> extends GlobalStream<T>,
ProcessConfigurable<ProcessConfigurableAndGlobalStream<T>> {}


/** This interface represents a configurable {@link TwoNonKeyedPartitionStreams}. */
interface ProcessConfigurableAndTwoNonKeyedPartitionStreams<T1, T2> extends TwoNonKeyedPartitionStreams<T1, T2>,
ProcessConfigurable<ProcessConfigurableAndTwoNonKeyedPartitionStreams<T1, T2>> {}

/** This interface represents a configurable {@link TwoKeyedPartitionStreams}. */
interface ProcessConfigurableAndTwoKeyedPartitionStreams<K, T1, T2> extends TwoKeyedPartitionStreams<K, T1, T2>,
ProcessConfigurable<ProcessConfigurableAndTwoKeyedPartitionStreams<K, T1, T2>> {}


/** This interface represents a configurable {@link TwoGlobalStreams}. */
interface ProcessConfigurableAndTwoGlobalStreams<T1, T2> extends TwoGlobalStreams<T1, T2>,
ProcessConfigurable<ProcessConfigurableAndTwoGlobalStreams<T1, T2>> {}

Regarding the second point, processconnectAndProcess and other operations performed on DataStreams will return an instance of corresponding interfaces. We will change the return type of one-input/two-output/two-input process and toSink methods as shown in the table below.

stream type

function

old return type

new return type

NonKeyedPartitionStream

process(OneInputStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

process(TwoOutputStreamProcessFunction)

TwoNonKeyedPartitionStreams

ProcessConfigurableAndTwoNonKeyedPartitionStreams

connectAndProcess(NonKeyedPartitionStream, TwoInputStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

connectAndProcess(BroadcastStream, TwoInputBroadcastStreamProcessFunction

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

toSink(Sink)

void

ProcessConfigurable

KeyedPartitionStream

process(OneInputStreamProcessFunction, KeySelector)

KeyedPartitionStream

ProcessConfigurableAndKeyedPartitionStream

process(OneInputStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

process(TwoOutputStreamProcessFunction, KeySelector, KeySelector)

TwoKeyedPartitionStreams

ProcessConfigurableAndTwoKeyedPartitionStreams

process(TwoOutputStreamProcessFunction)

TwoNonKeyedPartitionStreams

ProcessConfigurableAndTwoNonKeyedPartitionStreams

connectAndProcess(KeyedPartitionStream, TwoInputNonBroadcastStreamProcessFunction);

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

connectAndProcess(KeyedPartitionStream, TwoInputNonBroadcastStreamProcessFunction, KeySelector)

KeyedPartitionStream

ProcessConfigurableAndKeyedPartitionStream

connectAndProcess(BroadcastStream, TwoInputBroadcastStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

connectAndProcess(BroadcastStream, TwoInputBroadcastStreamProcessFunction, KeySelector)

KeyedPartitionStream

ProcessConfigurableAndKeyedPartitionStream

toSink(Sink)

void

ProcessConfigurable

GlobalStream

process(OneInputStreamProcessFunction)

GlobalStream

ProcessConfigurableAndGlobalStream

process(TwoOutputStreamProcessFunction)

TwoGlobalStreams

ProcessConfigurableAndTwoGlobalStreams

connectAndProcess(GlobalStream, TwoInputNonBroadcastStreamProcessFunction)

GlobalStream

ProcessConfigurableAndGlobalStream

toSink(Sink)

void

ProcessConfigurable

BroadcastStream



connectAndProcess(KeyedPartitionStream, TwoInputBroadcastStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

connectAndProcess(NonKeyedPartitionStream, TwoInputBroadcastStreamProcessFunction)

NonKeyedPartitionStream

ProcessConfigurableAndNonKeyedPartitionStream

connectAndProcess(KeyedPartitionStream, TwoInputBroadcastStreamProcessFunction, KeySelector)

KeyedPartitionStream

ProcessConfigurableAndKeyedPartitionStream

In order to support the configuration of fine-grained resources, the API layer must be able to set the specifications of each `SlotSharingGroup`. Therefore, we will move `org.apache.flink.api.common.operators.SlotSharingGroup` from the flink-core module to the flink-core-api module proposed in the umbrella FLIP.

Introduce Runtime Context

Unlike attributes like the name of process operation, some information(such as current key) can only be obtained when the process function is executed. In order to build a bridge between user functions and the execution engine, we provide a unified entrypoint called Runtime Context.

We divide all contextual into multiple parts according to their functions:

  • JobInfo: Hold all job related information, such as job name, execution mode.

  • TaskInfo: Hold all task related information, such as parallelism.

  • State Manager: Manage context related to state, such as accessing a specific state.

  • Watermark Manager: Manage context related to generalized watermark, such as triggering a watermark.

  • ProcessingTime Manager: Manage context related to processing timer, such as the current processing time.

  • TimestampManager: Manager context related to timestamp of record which processed by this function.

Their definitions are as follows:

JobInfo

JobInfor.java
/** The {@link JobInfo} represents the meta information of the job. */
public interface JobInfo {
    /**
     * Get the name of current job.
     *
     * @return the name of current job
     */
    String getJobName();

    /** Get the {@link ExecutionMode} of current job. */
    ExecutionMode getExecutionMode();

    /** Execution mode of this current job. */
    enum ExecutionMode {
        STREAMING,
        BATCH
    }

    // We will gradually add more related methods here.
}

TaskInfo

TaskInfo.java
/** The {@link TaskInfo} represents the meta information of the task. */
public interface TaskInfo {
    /**
     * Get the parallelism of current task.
     *
     * @return the parallelism of this process function.
     */
    int getParallelism();

    /**
     * Get the max parallelism of current task.
     *
     * @return The max parallelism.
     */
    int getMaxParallelism();

    /**
     * Get the name of current task.
     *
     * @return The name of current task.
     */
    String getTaskName();

    // We will gradually add more related methods here.
}

ProcessingTimeManager

ProcessingTimeManager.java
/**
 * This is responsibility for managing runtime information related to processing time of process
 * function.
 */
public interface ProcessingTimeManager {
    /**
     * Register a processing timer for this process function. onProcessingTimer method of this
     * function will be invoked as callback if the timer expires.
     *
     * @param timestamp to trigger timer callback.
     */
    void registerProcessingTimer(long timestamp);

    /**
     * Deletes the processing-time timer with the given trigger timestamp. This method has only an
     * effect if such a timer was previously registered and did not already expire.
     *
     * @param timestamp indicates the timestamp of the timer to delete.
     */
    void deleteProcessingTimeTimer(long timestamp);

    /**
     * Get the current processing time.
     *
     * @return current processing time.
     */
    long currentProcessingTime();
}

It should be noted that we can only register processing timer here. This is because we no longer unify event time and processing time into a single time service. Simply put, event Time can be implemented using the generalized watermark mechanism, and more detailed reasons have been explained in the Umbrella FLIP.

StateManager

StateManager.java
/** This is responsibility for managing runtime information related to state of process function. */
public interface StateManager {
    /**
     * Get the key of current record.
     *
     * @return The key of current processed record for {@link KeyedPartitionStream}. {@link
     *     Optional#empty()} for other non-keyed stream.
     */
    <K> Optional<K> getCurrentKey();

   // The logic of getting states is omitted in this FLIP.
    
}

The logic of getting states is omitted, but we will introduce it in detail in other sub-FLIPs later.

WatermarkManager

WatermarkManager.java
/**
 * This is responsibility for managing runtime information related to watermark of process function.
 */
public interface WatermarkManager {
     // The logic of processing watermark is omitted in this FLIP.
}

The logic of processing watermark is omitted, but we will introduce it in detail in other sub-FLIPs later.

TimestampManager

TimestampManager.java
/** This is responsibility for Retrieving timestamp related things of process function. */
public interface TimestampManager {
    /** Get the timestamp of current processing record. */
    long getCurrentRecordTimestamp();
}

RuntimeContext

RuntimeContext.java
/**
 * A RuntimeContext contains information about the context in which process functions are executed.
 * Each parallel instance of the function will have a context through which it can access contextual
 * information, such as the current key and execution mode. Through this context, we can also
 * interact with the execution layer, such as getting state, emitting watermark, registering timer, etc.
*/
public interface RuntimeContext {
    /** Get the {@link JobInfo} of this process function. */
    JobInfo getJobInfo();

    /** Get the {@link TaskInfo} of this process function. */
    TaskInfo getTaskInfo();

    /** Get the {@link ProcessingTimeManager} of this process function. */
    ProcessingTimeManager getProcessingTimeManager();

    /** Get the {@link StateManager} of this process function. */
    StateManager getStateManager();

    /** Get the {@link WatermarkManager} of this process function. */
    WatermarkManager getWatermarkManager();    
  
    /** Get the metric group of this process function. */
    OperatorMetricGroup getMetricGroup();

    /** Get the {@link TimestampManager} of this process function. */
    TimestampManager getTimestampManager();
}

Only partition independent methods are supported for its subclass NonPartitionedContext, they include getJobInfo, getTaskInfo, getWatermarkManager, getMetricGroup, getTimestampManager. But partition-related operations such as registering and deleting processing timers and getting state are not supported since the current partition cannot be decided.

ProcessFunction

At the same time, we should expand the implementation of Process Function. In more detail, we will introduce a new method called onProcessingTimer to all type of process functions. It is the callback for the timer registered through RuntimeContext#ProcessingTimeManager.
Note: The logic for processing watermark is omitted, but we will elaborate on it later in the future sub-FLIPs.

OneInputStreamProcessFunction

OneInputStreamProcessFunction.java
/** This class contains all logical related to process records from single input. */
public interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-409.

    /**
     * Callback for processing timer.
     *
     * @param timestamp when this callback is triggered.
     * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}

TwoInputNonBroadcastStreamProcessFunction

TwoInputNonBroadcastStreamProcessFunction.java
/** This class contains all logical related to process records from two non-broadcast input. */
public interface TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-409.

    /**
     * Callback for processing timer.
     *
     * @param timestamp when this callback is triggered.
     * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}

TwoInputBroadcastStreamProcessFunction

TwoInputBroadcastStreamProcessFunction.java
/**
 * This class contains all logical related to process records from a broadcast stream and a
 * non-broadcast stream.
 */
public interface TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-409.

    /**
     * Callback for processing timer.
     *
     * @param timestamp when this callback is triggered.
     * @param output to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) {}
}

TwoOutputStreamProcessFunction

TwoOutputStreamProcessFunction.java
/** This class contains all logical related to process and emit records to two outputs. */
public interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
    // Omit logic of processing data and life-cycle methods. They can be found in FLIP-409.

    /**
     * Callback for processing timer.
     *
     * @param timestamp when this callback is triggered.
     * @param output1 to emit record.
     * @param output2 to emit record.
     * @param ctx, runtime context in which this function is executed.
     */
    default void onProcessingTimer(
            long timestamp, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx) {}
}

Public Interfaces

The two interfaces ProcessConfigurable and RuntimeContext are introduced. The specific definitions are as mentioned above.

Compatibility, Deprecation, and Migration Plan

The contents described in this FLIP are all new APIs and do not involve compatibility issues.

Test Plan

Unit tests and integration test(A simple stateless job) will be added to guard these changes.