Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1) Modify `org.apache.flink.api.common.eventtime.Watermark` and `org.apache.flink.streaming.api.watermark.Watermark `by ` by adding a new field `useProcessingTime`, with a default value of false.

...

  • `AbstractStreamOperator` and `AbstractStreamOperatorV2` manage and trigger event time and processing time timers through `InternalTimeServiceManager`.

  • Only when the operator receives a watermark, it calls `InternalTimeServiceManager#advanceWatermak`InternalTimeServiceManager#advanceWatermark` to advance the event time and trigger event time timers.

...

  • It keeps track of the maximum watermark timestamp seen and the status(active/idle, aligned/unaligned) for each input channel.

    • Each input channel can be either active or idle. `StatusWatermarkValve` updates the status of each input channel when it receive `WatermarkStatus` from the upstream.

    • Each input channel can be either aligned or unaligned. An input channel is only consider aligned when it is active and its watermark is greater than or equals to the last watermark timestamp of the inputEach input channel can be either aligned or unaligned. An input channel is only consider aligned when it is active and its watermark is greater than or equals to the last watermark timestamp of the input.

  • It gets the minimum watermark timestamp among all the aligned input channels and uses it as the watermark for the input. If the new watermark timestamp is greater than the previous watermark timestamp, it invokes the `processWatermark` method.

...

The chart below shows the effective watermark of the input given the watermark of the two input channels.

InputChannel1 \ InputChannel2

currentTimestamp = t2

useProcessingTime = true

currentTimestamp = t2

useProcessingTime = false

currentTimestamp = t1

useProcessingTime = true

Watermark(MIN(t1, t2), true)


Watermark(t2, false)


currentTimestamp = t1

useProcessingTime = false


Watermark(t1, false)

Watermark(MIN(t1, t2), false)

...


IndexedCombinedWatermarkStatus

The `IndexedCombinedWatermarkStatus` represents combined value and status of a watermark for a set number of input partial watermarks. Operator advances the event time of `timeServiceManager` base on the combined watermark. `IndexedCombinedWatermarkStatus` ensures that the watermark timestamp of the operator that has multiple inputs is the minimum timestamp among all the inputs, and it should be monotonically increasing.

Original behavior:

  • It keeps track of the maximum watermark timestamp seen and the status(active/idle) for each input.

    • Each channel can be either active or idle. `IndexedCombinedWatermarkStatus` updates the status of the input when it receive `WatermarkStatus` from that input.

  • It gets the minimum watermark timestamp among all the active input and uses it as the watermark of the operator. If the new watermark timestamp is greater than the previous watermark timestamp, it advanced the event time of `timeServiceManager`.

Updated Behavior:

  • It keeps track of the maximum watermark timestamp seen, the status(active/idle), and the `useProcessingTime` for each input.

    • Each channel can be either active or idle. `IndexedCombinedWatermarkStatus` updates the status of the input when it receive `WatermarkStatus` from that input. If `useProcessingTime` of an input is set to true, it is always active.

  • If not all the active inputs have `useProcessingTime` set to true, the event time of the operator is the minimum watermark timestamp among all the active inputs. If all the active inputs have `useProcessingTime` set to true, the event time of the operator is in synchronization with the system time.


TemporalRowTimeJoinOperator

After the modifications to the `AbstractStreamOperator` based on the FLIP, the `TemporalRowTimeJoinOperator`, as a subclass of `AbstractStreamOperator`, will be able to support temporal joins based on processing time when both build side and probe side send watermark with `useProcessingTime` set to true, e.g. `MySqlSource` and `KafkaSource`.

In order to optimize the performance, when both the probe side and build side receive a watermark with `useProcessingTime` set to true, the operator can process the data in pure processing time mode without timer. The probe side data can directly join with the build side, and the build side only needs to keep the latest record without triggering clean-up based on event time.


Flink SQL Planner

We need to update the Flink SQL planner to select the appropriate temporal join operator based on the time attribute and the `WatermarkCharacteristic` of the source.

Original behavior:

  • If the temporal join is based on row time (event time), the `TemporalRowTimeJoinOperator` is used.

  • If the temporal join is based on processing time

    • If use the `TemporalFunctionJoin` syntax, the `TemporalProcessTimeJoinOperator` is used.

    • If not use the `TemporalFunctionJoin` syntax, an exception is thrown.

Updated behavior:

  • If the temporal join is based on row time (event time), the `TemporalRowTimeJoinOperator` is used. 

  • If the temporal join is based on processing time

    • If any `WatermarkCharacteristic` of  the sources is `UNDEFINED`.

      • If use the `TemporalFunctionJoin` syntax, the `TemporalProcessTimeJoinOperator` is used.

      • If not use the `TemporalFunctionJoin` syntax, an exception is thrown.

    • Otherwise, use `TemporalRowTimeJoinOperator`.


Example Usage

Here is the Flink SQL example that demonstrates how to perform processing time temporal join after the FLIP.


Code Block
languagesql
-- Create mysql cdc source table (dimension table)
CREATE TEMPORARY TABLE user_info (
    user_id INTEGER PRIMARY KEY NOT ENFORCED, 
    gender STRING
) WITH (
    'connector' = 'mysql-cdc',
    'database-name' = 'example_database',
    'hostname' = 'localhost',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'user_info'
);

-- Create datagen source table (fact table)
CREATE TEMPORARY TABLE click_event (
    user_id INTEGER,
    item_id INTEGER,
    proctime AS PROCTIME()
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.user_id.min' = '0',
    'fields.user_id.max' = '9'
);

-- Create a print sink table
CREATE TEMPORARY TABLE print_sink (
    user_id INTEGER,
    item_id INTEGER,
    gender STRING
) WITH (
    'connector' = 'print'
);

-- Processing time temporal join
INSERT INTO print_sink
SELECT click_event.user_id AS user_id, item_id, gender FROM click_event 
LEFT JOIN user_info FOR SYSTEM_TIME AS OF click_event.proctime
ON click_event.user_id = user_info.user_id;


Compatibility, Deprecation, and Migration Plan

The modifications made to the existing Watermark functionality are backward compatible. This is because we only add new APIs, which do not cause existing code handling Watermarks to fail. Additionally, the default setting for the `useProcessingTime` parameter in Watermark instances is false, preserving the existing semantics.

With the updates to `AbstractStreamOperator/AbstractStreamOperatorV2` based on the FLIP, all operators can support Watermarks with `useProcessingTime`. And correctly trigger the operator's timer based on event time or the system time. For sources that haven't been updated, the Watermarks they send always have `useProcessingTime` set to false. In this case, the behavior of the operators remains unchanged, ensuring compatibility with existing jobs.

In this FLIP, we only updated the MySql CDC Source and Kafka Source to support sending watermark with `useProcessingTime`, so that only these two sources can be used to perform processing time temporal join. In order to allow more sources to perform processing time temporal join, we need to gradually update the source implementations to support sending Watermarks with the `useProcessingTime` semantics. 

Here is the guideline of updating a source implementation:

  1. For any source, if the user specify a `WatermarkStrategy` with `NoWatermarkGenerator`, it may send a watermark with `useProcessingTime` set to true, depending on the use case and the characteristic of the source. For example,

    1. `MySqlCdcSource` sends the watermark with `useProcessingTime` set to true at the beginning of the binlog phrase. 

    2. A `KafkaSource` sends the watermark with `useProcessingTime` set to true at the beginning if `NoWatermarkGenerator` is used.

    3. A hybrid source can send the watermark with `useProcessingTime` set to true when switching from historical data to real time data.

  2. Source should implements the `getWatermarkCharacteristic` method to returns `ANY_WATERMARK`



  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

...