Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

After the update, we can still ensure that the watermark effective timestamp passed to the `processWatermark` method is the minimum among all input channels of that input, and it is monotonically increasing.


The chart below shows the effective watermark of the input given the watermark of the two input channelsThe chart below shows the effective watermark of the input given the watermark of the two input channels.

InputChannel1 \ InputChannel2

currentTimestamp = t2

useProcessingTime = true

currentTimestamp = t2

useProcessingTime = false

currentTimestamp = t1

useProcessingTime = true

Watermark(MIN(t1, t2), true)


Watermark(t2, false)


currentTimestamp = t1

useProcessingTime = false


Watermark(t1, false)

Watermark(MIN(t1, t2), false)

...

We need to update the Flink SQL planner to select the appropriate temporal join operator based on the time attribute and the `WatermarkCharacteristic` of the sourceof the source.

Original behavior:

  • If the temporal join is based on row time (event time), the `TemporalRowTimeJoinOperator` is used is used.

  • If the temporal join is based on processing time

    • If use the `TemporalFunctionJoin` syntax, the `TemporalProcessTimeJoinOperator` is used.

    • If not use the `TemporalFunctionJoin` syntax, an exception is thrown.

...

  • If the temporal join is based on row time (event time), the `TemporalRowTimeJoinOperator` is used. 

  • If the temporal join is based on processing time

    • If any `WatermarkCharacteristic` of  the sources is `UNDEFINED`.

      • If use the `TemporalFunctionJoin` syntax, the `TemporalProcessTimeJoinOperator` is used.

      • If not use the `TemporalFunctionJoin` syntax, an exception is thrown syntax, an exception is thrown.

    • Otherwise, use `TemporalRowTimeJoinOperator`.

...

  1. For any source, if the user specify a `WatermarkStrategy` with `NoWatermarkGenerator`, it may send a watermark with `useProcessingTime` set to true, depending on the use case and the characteristic of the source. For example,

    1. `MySqlCdcSource` sends the watermark with `useProcessingTime` set to true at the beginning of the binlog phrase. 

    2. A `KafkaSource` sends the watermark with `useProcessingTime` set to true at the beginning if `NoWatermarkGenerator` is used.

    3. A hybrid source can send the watermark with `useProcessingTime` set to true when switching from historical data to real time dataset to true when switching from historical data to real time data.

  2. Source should implements the `getWatermarkCharacteristic` method to returns `ANY_WATERMARK`

...

When invoking methods like `DataStream.windowAll` or `KeyedDataStream.window`, users need to select the appropriate `WindowAssigner` based on processing time or event time.

  • org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
    • TumblingEventTimeWindows vs TumblingProcessingTimeWindows

    • SlidingEventTimeWindows vs SlidingProcessingTimeWindows

    • DynamicEventTimeSessionWindows vs DynamicProcessingTimeSessionWindows

...

    • EventTimeSessionWindows vs ProcessingTimeSessionWindows

When implementing custom `ProcessFunction` or `KeyedProcessFunction`, users need to differentiate between registering a timer for processing time or event time using the `TimerService`.

  • org.apache.flink.streaming.api.TimerService

    • registerProcessingTimeTimer vs registerEventTimeTimerregisterProcessingTimeTimer vs registerEventTimeTimer

Appendix - Update MySqlSource

...