Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread-
Vote thread-
JIRA

-

Release-

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state

Discussion thread:

JIRA:

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Assuming the user needs to perform a temporal join, where the Probe Side comes from Kafka and the Build Side comes from a MySQL CDC Source (with a snapshot reading phase and a binlog reading phase), and all input data lacks records lack event-time information. The user requires each data record on the Probe Side to be joined with at least the data records from the Build Side's snapshot phase. In other words, the Join operator needs to wait for the Build Side's snapshot phase to finish reading its data records before processing the Probe Side's data.

Currently, Flink cannot support the use case described above. Flink SQL does not support the usage of "SYSTEM_TIME AS OF" syntax used in temporal join with the latest version of any view/table. " This is because the TemporalProcessTimeJoinOperator supports temporal joins based on processing time but does not support the Probe Side waiting for data records from the Build Side in processing time mode. The operator may start processing the Probe Side's data before reading the data from the Build Side's snapshot phase, which can lead to situations where the Probe Side's data cannot be joined with any data, resulting in output that does not meet the user's requirements. For more details, you can refer to FLINK-19830.

This document proposes a new API and mechanism at the DataStream, allowing the Source APIs that allow source operators (e.g., HybridSource, MySQL CDC Source) to send a watermark that allows that to downstream operator to advance its event time base on indicating that watermark should start to increase according to the system time. In addition to supporting the processing-time temporal join operator, the probe side can wait for the build side until the build side receives the watermark to start advancing it's event time base on system time, this change also allows us to simplify DataStream APIs such as KeyedStream#window(...) so that users would not need to explicitly differentiate between TumblingEventTimeWindows and TumblingProcessingTimeWindows.

Terminology and Background

The FLIP will make changes to Flink's watermark and timestamp concepts. To help understand the intuition behind the design, we will introduce recap the relevant concepts in this section.

Probe SideProbe Side: The left side of the stream in a temporal join, sometimes referred to as the Fact Table. Usually, the data on the Probe Side doesn't need to be retained after processing.

...

Code Block
languagejava
titleWatermarkGenerator
@Public
public interface WatermarkGenerator<T> {
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    void onPeriodicEmit(WatermarkOutput output);
}

For Flink DataStream jobs:


Here is how a DataStream program determines the value of event-time and watermark:

  • When creating a source, When creating a source, user provides a `WatermarkStrategy` to `StreamExecutionEnvironment#fromSource`.
  • If the the source supports event time natively (e.g., KafkaSource) or the user provides a custom `TimestampAssigner` in the `WatermarkStrategy` to extract the timestamp from the record, Flink will add the timestamp to the record. Otherwise, the timestamp on the record will be `Long.MIN_VALUE`.
  • If the user uses `NoWatermarkGenerator` in the `WatermarkStrategy`, the job will not generate watermarks. Otherwise, the job will periodically emit watermarks, and the watermark value depends on event time. The output frequency of watermarks is determined by `pipeline.auto-watermark-interval`, with a default value of 200ms.

Public Interfaces

1) Modify Add an `useProcessingTime` field to `org.apache.flink.api.common.eventtime.Watermark` and `org.apache.flink.streaming.api.watermark.Watermark` by adding a new field `useProcessingTime`, with a default value of false.

Code Block
languagejava
titleWatermark
/**
 * Watermarks are the progress indicators in the data streams. A watermark signifies that no events
 * with a timestamp smaller than or equal to the watermark's time will occur after the watermark.
 *
 * <ul>
 *   <li>A watermark with a timestamp <i>T</i> and useProcessingTime set to false indicates that the
 *       event time of the stream has progressed to time <i>T</i>.
 *   <li>A watermark with a timestamp <i>T</i> and useProcessingTime set to true indicates that the
 *       event time of the stream progresses in synchronization with the system time. The timestamp
 *       <i>T</> must be less than the current system time. Otherwise, an exception will be thrown.
 * </ul>
 *
 * <p>Watermarks are created at the sources and propagate through the streams and operators.
 *
 * <p>In some cases a watermark is only a heuristic, meaning some events with a lower timestamp may
 * still follow. In that case, it is up to the logic of the operators to decide what to do with the
 * "late events". Operators can for example ignore these late events, route them to a different
 * stream, or send update to their previously emitted results.
 *
 * <p>When a source reaches the end of the input, it emits a final watermark with timestamp {@code
 * Long.MAX_VALUE}, indicating the "end of time".
 *
 * <p>Note: A stream's time starts with a watermark of {@code Long.MIN_VALUE}. That means that all
 * records in the stream with a timestamp of {@code Long.MIN_VALUE} are immediately late.
 *
 * <p>Note: After sending a watermark with useProcessingTime set to true, itthe source isshould only allowed to send
 * subsequent watermarks with useProcessingTime set to true. Sending a watermark with
 * useProcessingTime set to false will result in an exception.
 */
@Public
public final class Watermark implements Serializable {
  	...

    /**
     * If useProcessingTime set to false, this is the time of the watermark in milliseconds. If
     * useProcessingTime set to true, this is the last effective time of the watermark in
     * milliseconds.
     */
    private final long timestamp;

  
    /**
     * If this is true, this watermark indicates the event time of the stream progresses in
     * synchronization with the system time.
     */
    private final boolean useProcessingTime;

    public Watermark(long timestamp) {
      this(timestamp, false);
    }

    public Watermark(long timestamp, boolean useProcessingTime) {
        this.timestamp = timestamp;
        this.useProcessingTime = useProcessingTime;
    }

    /** Returns whether the time of watermark can be determined by the system time. */
    public boolean useProcessingTime() {
        return useProcessingTime;
    }
}


Please note that the update do not proposed changeextends the abstraction of the information passed by the `Watermark` class but rather provide a natural extension to its current capabilitiesWatermark class' expressibility without changing its core abstraction. When `useProcessingTime` is false, then the watermark is a static value (i.e. Watermark#timestamp). When `useProcessingTime` is falsetrue, then the watermark indicates that the stream has progressed to a specific event time. When `useProcessingTime` is true, it indicates that the stream progresses in synchronization with the system timeshould be derived dynamically using a function (i.e. System#currentTimeMillis). In both case, the watermark indicates the time progression in the data streamWatermark class is used to tell downstream operators how to determine the watermark value.


2) Update `AbstractStreamOperator/AbstractStreamOperatorV2` to support processing watermarks with `useProcessingTime` set to truehandle Watermark#useProcessingTime properly:

Original Behavior:

  • `AbstractStreamOperator` and `AbstractStreamOperatorV2` manage and trigger event time and processing time timers through `InternalTimeServiceManager`.

  • Only when the operator receives a watermark, it calls `InternalTimeServiceManager#advanceWatermark` to advance the event time and trigger event time timers.

Updated Behavior:

  • When receiving a watermark with `useProcessingTime` set to false, the behavior remains the same as before.

  • When receiving a watermark with `useProcessingTime` set to true, the operator starts the `ScheduledThreadPoolExecutor` that trigger the event time timers as the system time progresses.

After modifying the operator and the operator receive a watermark with `useProcessingTime` set to true, the behavior will be the same as processing time operators.

  • After receiving an instance of Watermark, the operator triggers those event-time timers whose scheduled time <= watermark#getTimestamp

Updated Behavior:

  • After receiving a watermark:
    • If watermark#useProcessingTime  == false, the operator triggers those event-time timers whose scheduled time <= watermark#getTimestamp
    • Otherwise, the operator starts a scheduler to dynamically trigger those event-time timers whose scheduled time <= System#currentTimeMillis


3) Update NoWatermarksGenerator#onPeriodicEmit to emit Watermark(timestamp=Long.MIN_VALUE, useProcessingTime=true).

Note that NoWatermarksGenerator is currently only used when every operator in the job uses "processing time mode" , and these operators's behavior will not rely on the watermark value. Therefore the proposed change will not break existing jobs that use NoWatermarksGenerator.3) Introduce `WatermarkCharacteristic` enum class

Code Block
languagejava
titleWatermarkCharacteristicWatermarkGenerator
/**
 * WatermarkCharacteristicAn isimplementation usedof bya the{@link sourceWatermarkGenerator} implementation to describe the characteristic ofthat only
 * the watermark base on the WatermarkStrategy.generates Watermark(timestamp=Long.MIN_VALUE, useProcessingTime=true). 
 */
@PublicEvolving@Public
public enum WatermarkCharacteristic final class NoWatermarksGenerator<E> implements WatermarkGenerator<E> {

    ...
}


4) Add delegateWatermarkPeriodicEmit() to the SourceReader interface.

Code Block
languagejava
titleWatermarkGenerator
@Public
public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable, CheckpointListener {   
    ...

    /**/**
     * This implies that only watermark with useProcessingTime set to false can be sent and no
     * watermarkProvide willSourceReader bewith senta inrunnable casethat ofcan processingbe time.used Thisto isemit the default for all the sourcewatermark. SourceReader
     * should implementations.
override this method and return */
true if it wants UNDEFINED,

to own the  /**responsibility
     * Thisof impliesinvoking thatWatermarkGenerator#onPeriodicEmit.
 source will send watermark with*
 useProcessingTime set to false in* case@return of
true iff the caller should *avoid eventtriggering timeWatermarkGenerator#onPeriodicEmit.
 and with useProcessingTime set to*/
 true in case ofdefault processingboolean time is required.delegateWatermarkPeriodicEmit(Runnable onWatermarkEmit) {
     */   return false;
    ANY_WATERMARK,}
}


Proposed Changes

41) Update `Source` interface, introducing `getWatermarkCharacteristic` method

Code Block
languagejava
titleSource
@Public
public interface Source<T, SplitT extends SourceSplit, EnumChkT>
        extends SourceReaderFactory<T, SplitT> {

    /**
     * Get the watermark characteristic of this source.
     *
     * @param watermarkStrategy The watermark strategy of the source.
     * @return the watermark characteristic of this source.
     */
    default WatermarkCharacteristic getWatermarkCharacteristic(
            WatermarkStrategy<?> watermarkStrategy) {
        return WatermarkCharacteristic.UNDEFINED;
    }
}

5) Update Source Implementations

KafkaSource

  • Update the `KafkaSourceReader` to emit watermark with `useProcessingTime` set to true at the beginning, if `NoWatermarkGenerator` is used. 

  • Update KafkaSource to overwrite the `getWatermarkCharacteristic` method to returns `ANY_WATERMAK`.

MySqlSource

Update to the `MySqlSource` is documented in the appendix.

Proposed Changes

WatermarkToDataOutput

`WatermarkToDataOutput` is responsible for sending the watermark from the `SourceReader` downstream and ensuring that the watermark timestamps sent are monotonically increasing.

Original Behavior:

  • When the `WatermarkToDataOutput` receive a watermark, it checks the timestamp of the watermark. It only send the watermark downstream if the timestamp of the watermark is greater than the timestamp of the most recently sent watermark.

SourceOperator behavior

  • SourceOperator should invoke SourceReader#delegateWatermarkPeriodicEmit with a runnable which invokes WatermarkGenerator#onPeriodicEmit.
  • If delegateWatermarkPeriodicEmit() returns true, SourceOperator should not start any scheduler that invokes WatermarksGenerator#onPeriodicEmit.
  • Otherwise, if emitProgressiveWatermarks == false, SourceOperator should invoke WatermarksGenerator#onPeriodicEmit once before it emits the first record.

  • Otherwise, SourceOperator will keep its existing behavior (i.e. start a scheduler to invoke WatermarksGenerator#onPeriodicEmit periodically).


2) Update sources with bounded/unbounded phases (e.g. HybridSource, MySQL CDC Source) to override SourceReader#delegateWatermarkPeriodicEmit.

These sources should invoke onWatermarkEmit when they want to notify downstream operators of the watermark value (e.g. at the beginning of MySQL CDC binlog phase).


3) Update TemporalRowTimeJoinOperator to optimize the case where watermarks from both inputs have useProcessingTime == true. 

After the modifications to the `AbstractStreamOperator` based on the FLIP, the `TemporalRowTimeJoinOperator`, as a subclass of `AbstractStreamOperator`, will be able to support temporal joins based on processing time when both build side and probe side send watermark with `useProcessingTime` set to true, e.g. `MySqlSource` and `KafkaSource`.

In order to optimize the performance, when both the probe side and build side receive a watermark with `useProcessingTime` set to true, the operator can process the data in pure processing time mode without timer. The probe side data can directly join with the build side, and the build side only needs to keep the latest record without triggering clean-up based on event time.


TODO: check whether it offers the best performance.


4) Update Flink SQL Planner to support processing-time temporal join and remove TemporalProcessTimeJoinOperator

We will update the Flink SQL planner to select the appropriate temporal join operator based on the time attribute and the `WatermarkCharacteristic` of the source.

Original behavior:

  • If the temporal join is based on row time (event time), use `TemporalRowTimeJoinOperator`.

  • If the temporal join is based on processing time

    • If use the `TemporalFunctionJoin` syntax, use `TemporalProcessTimeJoinOperator`.

    • If not use the `TemporalFunctionJoin` syntax, an exception is thrown.

Updated behavior:

  • Use TemporalRowTimeJoinOperator for temporal join.


TODO: check whether it is correct when TemporalJoinFunction syntax is used.


5) Update WatermarkToDataOutput to handle Watermark#useProcessingTime.

`WatermarkToDataOutput` is responsible for sending the watermark from the `SourceReader` downstream and ensuring that the watermark never decreases. The proposed change will keep this semantics.


Original Behavior:Updated Behavior:

  • When the `WatermarkToDataOutput` receive a watermark, it checks the timestamp and `useProcessingTime` field of the watermark. It only send the watermark downstream if the timestamp of the watermark is greater than the timestamp of the most recently sent watermark or if the `useProcessingTime` field is set to true..

Updated Behavior:

  • When the `WatermarkToDataOutput` receive a watermark, it checks the timestamp and `useProcessingTime` field of the watermark. It only send the watermark downstream if the timestamp of the watermark is greater than the timestamp of the most recently sent watermark or if the `useProcessingTime` field is set to true.

  • If the watermark to be sent has `useProcessingTime` set to true, and the current system time is lesser than the timestamp of the most recently sent watermark, an exception is thrown.

  • If a watermark with `useProcessingTime` set to true has been previously sent, and the watermark to be sent has `useProcessingTime` set to false, an exception is thrown.

The updated `WatermarkToDataOutput` ensures that when `useProcessingTime` is false, the watermark's timestamp is monotonically increasingwatermark#timestamp never decreases. It also ensures that information with `useProcessingTime` set to true is sent downstream and that the `useProcessingTime` will not set back to false once it is set to true.


6) Update StatusWatermarkValve to handle Watermark#useProcessingTime.

The `StatusWatermarkValve` is used to calculate the current watermark for an input that has multiple input channels and invoke the `processWatermark` method of the operator. `StatusWatermarkValve` ensures that the watermark timestamp passed to the `processWatermark` method is the minimum timestamp among all the input channels of that input, and it should be monotonically increasingnever decrease. The proposed change will keep this semantics.

Original behavior:

  • It keeps track of the maximum watermark timestamp seen and the status(active/idle, aligned/unaligned) for each input channel.

    Each input channel can be either active or idle. `StatusWatermarkValve` updates the status of each input channel when it receive `WatermarkStatus` from the upstream.

  • Each input channel can be either aligned or unaligned. An input channel is

    only

    consider aligned

    when

    iff it is active and its watermark is greater than or equals to the last watermark timestamp of the input.

  • It gets the minimum watermark timestamp among all the aligned input channels and uses it as the watermark for the input. If the new watermark timestamp is greater than the previous watermark timestamp, it invokes the `processWatermark` method.

Updated Behavior:

  • It keeps track of the maximum watermark timestamp seen, the status(active/idle, aligned/unaligned), and useProcessingTime for each input channel.

    Each input channel can be either active or idle. `StatusWatermarkValve` updates the status of each input channel when it receive `WatermarkStatus` from the upstream. If the input channel has `useProcessingTime` set to true, then it is considered active.

  • Each input channel can be either aligned or unaligned. If the `useProcessingTime` of the last watermark of the input is set to false, an input channel is only consider aligned when iff it is active and its watermark is greater than or equals to the last watermark timestamp of the input. If the `useProcessingTime` of the last watermark of the input is set to true, an input channel is only consider aligned when iff it is active and its `useProcessingTime` is set to true.

  • If not all the aligned input channels have `useProcessingTime` set to truethere exists any input channel with useProcessingTime==false, the watermark of the input is the minimum watermark timestamp among all the aligned input channels . If all the aligned input channels have `useProcessingTime` set to truewhose useProcessingTime == false. Otherwise, the watermark of the input has `useProcessingTime` set to trueis Watermark(timestamp=Long.MIN_VALUE, useProcessingTime=true).


After the update, we can still ensure that the effective watermark effective timestamp passed to the `processWatermark` method is the minimum among all input channels of that input, and it is monotonically increasing.The chart below shows the effective never decrease. The chart below shows the effective watermark of the input given the watermark of the two input channels.

Note that if any operator emits Watermark(timestamp=t, useProcessingTime=true), then it is required that t <= System#currentTimeMillis. This effectively makes Watermark(timestamp=t, useProcessingTime=true) equivalent to Watermark(timestamp=Long.MIN_VALUE, useProcessingTime=true)


InputChannel1 \ InputChannel2

currentTimestamp = t2

useProcessingTime = true

currentTimestamp = t2

useProcessingTime =

InputChannel1 \ InputChannel2

currentTimestamp = t2

useProcessingTime = true

currentTimestamp = t2

useProcessingTime = false

currentTimestamp = t1

useProcessingTime = true

Watermark(Long.MIN(t1_VALUE,  t2true), true)

Watermark(t2, false)

currentTimestamp = t1

useProcessingTime = false

Watermark(t1, false)

Watermark(MIN(t1, t2), false)


7) Update IndexedCombinedWatermarkStatus to handle Watermark#useProcessingTime.

The `IndexedCombinedWatermarkStatus` represents combined value and status of a watermark for a set number of input partial watermarks. Operator advances the event time of `timeServiceManager` base on the combined watermark. `IndexedCombinedWatermarkStatus` ensures that the watermark timestamp of the operator that has multiple inputs is the minimum timestamp among all the inputs , and it should be monotonically increasing.

Original behavior:

...

never decrease. The proposed change will keep this semantics.

Original behavior:

  • Each channel can be either active or idle.

    `IndexedCombinedWatermarkStatus`

    It updates the status of the input when it receive `WatermarkStatus` from that input.

  • It gets the minimum watermark timestamp among all the active input and uses it as the watermark of the operator. If the new watermark timestamp is greater than the previous watermark timestamp, it advanced the event time of `timeServiceManager`.

Updated Behavior:

  • It keeps track of the maximum watermark timestamp seen, the status(active/idle), and the `useProcessingTime` for each input.

    Each channel can be either active or idle.

    `IndexedCombinedWatermarkStatus`

    It updates the status of the input when it receive `WatermarkStatus` from that input. If `useProcessingTime` of an input is set to true, it is always active.

  • If not all the active inputs have `useProcessingTime` set to true, the event time there exists any input with useProcessingTime==false, the watermark timestamp of the operator is the minimum watermark timestamp among all the active inputs. If all the active inputs have `useProcessingTime` set to trueOtherwise, the event time watermark of the operator is in synchronization with the system time.

TemporalRowTimeJoinOperator

After the modifications to the `AbstractStreamOperator` based on the FLIP, the `TemporalRowTimeJoinOperator`, as a subclass of `AbstractStreamOperator`, will be able to support temporal joins based on processing time when both build side and probe side send watermark with `useProcessingTime` set to true, e.g. `MySqlSource` and `KafkaSource`.

In order to optimize the performance, when both the probe side and build side receive a watermark with `useProcessingTime` set to true, the operator can process the data in pure processing time mode without timer. The probe side data can directly join with the build side, and the build side only needs to keep the latest record without triggering clean-up based on event time.

Flink SQL Planner

We need to update the Flink SQL planner to select the appropriate temporal join operator based on the time attribute and the `WatermarkCharacteristic` of the source.

Original behavior:

  • If the temporal join is based on row time (event time), the `TemporalRowTimeJoinOperator` is used.

  • If the temporal join is based on processing time

    • If use the `TemporalFunctionJoin` syntax, the `TemporalProcessTimeJoinOperator` is used.

    • If not use the `TemporalFunctionJoin` syntax, an exception is thrown.

Updated behavior:

  • If the temporal join is based on row time (event time), the `TemporalRowTimeJoinOperator` is used. 

  • If the temporal join is based on processing time

    • If any `WatermarkCharacteristic` of  the sources is `UNDEFINED`.

      • If use the `TemporalFunctionJoin` syntax, the `TemporalProcessTimeJoinOperator` is used.

      • If not use the `TemporalFunctionJoin` syntax, an exception is thrown.

    • Otherwise, use `TemporalRowTimeJoinOperator`.

Example Usage

Here is the Flink SQL example that demonstrates how to perform processing time temporal join after the FLIP.

  • Watermark(timestamp=Long.MIN_VALUE, useProcessingTime=true).


Example Usage

Here is the Flink SQL example that demonstrates how to perform processing time temporal join after the FLIP.


Code Block
languagesql
-- Create mysql cdc source table (dimension table)
CREATE TEMPORARY TABLE user_info (
    user_id INTEGER PRIMARY KEY NOT ENFORCED, 
    gender STRING
) WITH (
    'connector' = 'mysql-cdc',
    'database-name' = 'example_database',
    'hostname' = 'localhost',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'user_info'
);

-- Create datagen source table (fact table)
CREATE TEMPORARY TABLE click_event (
    user_id INTEGER,
    item_id INTEGER,
    proctime AS PROCTIME()
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.user_id.min' = '0',
    'fields.user_id.max' = '9'
);

-- Create a print sink table
CREATE TEMPORARY TABLE print_sink
Code Block
languagesql
-- Create mysql cdc source table (dimension table)
CREATE TEMPORARY TABLE user_info (
    user_id INTEGER,
 PRIMARY  KEY NOTitem_id ENFORCEDINTEGER, 
    gender STRING
) WITH (
    'connector' = 'mysql-cdcprint',
    'database-name' = 'example_database',
    'hostname' = 'localhost',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'user_info'
);

-- Create datagen source table (fact table)
CREATE TEMPORARY TABLE click_event (
    user_id INTEGER,
    item_id INTEGER,
    proctime AS PROCTIME()
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.user_id.min' = '0',
    'fields.user_id.max' = '9'
);

-- Create a print sink table
CREATE TEMPORARY TABLE print_sink (
    user_id INTEGER,
    item_id INTEGER,
    gender STRING
) WITH (
    'connector' = 'print'
);

-- Processing time temporal join
INSERT INTO print_sink
SELECT click_event.user_id AS user_id, item_id, gender FROM click_event 
LEFT JOIN user_info FOR SYSTEM_TIME AS OF click_event.proctime
ON click_event.user_id = user_info.user_id;

Compatibility, Deprecation, and Migration Plan

The modifications made to the existing Watermark functionality are backward compatible. This is because we only add new APIs, which do not cause existing code handling Watermarks to fail. Additionally, the default setting for the `useProcessingTime` parameter in Watermark instances is false, preserving the existing semantics.

With the updates to `AbstractStreamOperator/AbstractStreamOperatorV2` based on the FLIP, all operators can support Watermarks with `useProcessingTime`. And correctly trigger the operator's timer based on event time or the system time. For sources that haven't been updated, the Watermarks they send always have `useProcessingTime` set to false. In this case, the behavior of the operators remains unchanged, ensuring compatibility with existing jobs.

In this FLIP, we only updated the MySql CDC Source and Kafka Source to support sending watermark with `useProcessingTime`, so that only these two sources can be used to perform processing time temporal join. In order to allow more sources to perform processing time temporal join, we need to gradually update the source implementations to support sending Watermarks with the `useProcessingTime` semantics. 

Here is the guideline of updating a source implementation:

  1. For any source, if the user specify a `WatermarkStrategy` with `NoWatermarkGenerator`, it may send a watermark with `useProcessingTime` set to true, depending on the use case and the characteristic of the source. For example,

    1. `MySqlCdcSource` sends the watermark with `useProcessingTime` set to true at the beginning of the binlog phrase. 

    2. A `KafkaSource` sends the watermark with `useProcessingTime` set to true at the beginning if `NoWatermarkGenerator` is used.

    3. A hybrid source can send the watermark with `useProcessingTime` set to true when switching from historical data to real time data.

  2. Source should implements the `getWatermarkCharacteristic` method to returns `ANY_WATERMARK`

Test Plan

The change will be covered with unit and integration tests.

Rejected Alternatives

Not yet.

Future Work

After this FLIP, we can unify the API for processing time and event time. The following are some examples of the APIs pairs that distinguish between event time and processing time. Currently, in the DataStream API, users need to explicitly differentiate between Processing Time and Event Time in several places when expressing job logic.

When invoking methods like `DataStream.windowAll` or `KeyedDataStream.window`, users need to select the appropriate `WindowAssigner` based on processing time or event time.

  • org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
    • TumblingEventTimeWindows vs TumblingProcessingTimeWindows

    • SlidingEventTimeWindows vs SlidingProcessingTimeWindows

    • DynamicEventTimeSessionWindows vs DynamicProcessingTimeSessionWindows

    • EventTimeSessionWindows vs ProcessingTimeSessionWindows

When implementing custom `ProcessFunction` or `KeyedProcessFunction`, users need to differentiate between registering a timer for processing time or event time using the `TimerService`.

  • org.apache.flink.streaming.api.TimerService

    • registerProcessingTimeTimer vs registerEventTimeTimer

Appendix - Update MySqlSource

Add a method `useProcessingTimeDuringBinlog` to `MySqlSourceBuilder` to specify that the `MySqlSource` should send a watermark with `useProcessingTime` set to true at the beginning of the binlog phrase.

Code Block
languagejava
@PublicEvolving
public class MySqlSourceBuilder<T> {
    ...
  
    /** Use processing time at during the binlog phrase. */
    public MySqlSourceBuilder<T> useProcessingTimeDuringBinlog() {
    ...
    }
}

Note that user can only use `NoWatermarkGenerator` when MySqlSource use processing time during binlog. Otherwise, an exception is thrown.

...


);

-- Processing time temporal join
INSERT INTO print_sink
SELECT click_event.user_id AS user_id, item_id, gender FROM click_event 
LEFT JOIN user_info FOR SYSTEM_TIME AS OF click_event.proctime
ON click_event.user_id = user_info.user_id;


Compatibility, Deprecation, and Migration Plan

The change might be backward incompatible in the following case:

  • User writes a Flink SQL program with temporal join in processing time, which do not have TemporalFunctionJoin syntax.
  • The source on the Build Side has bounded/unbounded phase (e.g. HybridSource, MySQL CDC) and has not been updated to overwrite SourceReader#delegateWatermarkPeriodicEmit as specified in this FLIP.
  • User prefers to have the job fail fast rather than producing results where the probe-side records fail to join with the records from the bounded phase of the build-side source.

We believe the impact of the above scenario is out-weighted by the benefits of this FLIP. In the common case, users can notice the data quality issue and be able to upgrade the source library version to resolve the issue. We should mention this change in the Flink release notice.

Other than the issue mentioned above, the changes made in this FLIP is backward compatible, for the following reasons:

  • We only add new APIs, which do not cause existing code handling Watermarks to fail. Additionally, the default setting for the `useProcessingTime` parameter in Watermark instances is false, preserving the existing semantics.
  • With the updates to `AbstractStreamOperator/AbstractStreamOperatorV2` based on the FLIP, all operators can support Watermarks with `useProcessingTime`. And correctly trigger the operator's timer based on event time or the system time. For sources that haven't been updated, the Watermarks they send always have `useProcessingTime` set to false. In this case, the behavior of the operators remains unchanged, ensuring compatibility with existing jobs.



Test Plan

The change will be covered with unit and integration tests.


Rejected Alternatives

Not yet.

Future Work

After this FLIP, we can unify the API for processing time and event time. The following are some examples of the APIs pairs that distinguish between event time and processing time. Currently, in the DataStream API, users need to explicitly differentiate between Processing Time and Event Time in several places when expressing job logic.

When invoking methods like `DataStream.windowAll` or `KeyedDataStream.window`, users need to select the appropriate `WindowAssigner` based on processing time or event time.

  • org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
    • TumblingEventTimeWindows vs TumblingProcessingTimeWindows

    • SlidingEventTimeWindows vs SlidingProcessingTimeWindows

    • DynamicEventTimeSessionWindows vs DynamicProcessingTimeSessionWindows

    • EventTimeSessionWindows vs ProcessingTimeSessionWindows

When implementing custom `ProcessFunction` or `KeyedProcessFunction`, users need to differentiate between registering a timer for processing time or event time using the `TimerService`.

  • org.apache.flink.streaming.api.TimerService

    • registerProcessingTimeTimer vs registerEventTimeTimer