Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between Xuannan Su   and Dong Lin  ]


Table of Contents

Motivation

Assuming the user needs to perform a temporal join, where the Probe Side comes from Kafka and the Build Side comes from a MySQL CDC Source (with a snapshot reading phase and a binlog reading phase), and all input data lacks event-time information. The user requires each data record on the Probe Side to be joined with at least the data from the Build Side's snapshot phase. In other words, the Join operator needs to wait for the Build Side's snapshot phase to finish reading its data before processing the Probe Side's data.

...

The chart below shows the effective watermark of the input given the watermark of the two input channels.

InputChannel1 \ InputChannel2

currentTimestamp = t2

useProcessingTime = true

currentTimestamp = t2

useProcessingTime = false

currentTimestamp = t1

useProcessingTime = true

Watermark(MIN(t1, t2), true)


Watermark(t2, false)


currentTimestamp = t1

useProcessingTime = false


Watermark(t1, false)

Watermark(MIN(t1, t2), false)


IndexedCombinedWatermarkStatus

...

  1. For any source, if the user specify a `WatermarkStrategy` with `NoWatermarkGenerator`, it may send a watermark with `useProcessingTime` set to true, depending on the use case and the characteristic of the source. For example,

    1. `MySqlCdcSource` sends the watermark with `useProcessingTime` set to true at the beginning of the binlog phrase. 

    2. A `KafkaSource` sends the watermark with `useProcessingTime` set to true at the beginning if `NoWatermarkGenerator` is used.

    3. A hybrid source can send the watermark with `useProcessingTime` set to true when switching from historical data to real time data.

  2. Source should implements the `getWatermarkCharacteristic` method to returns `ANY_WATERMARK`. 

...

  1.  

...

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?


Rejected Alternatives

...

Not yet.

Future Work

After this FLIP, we can unify the API for processing time and event time. The following are some examples of the APIs pairs that distinguish between event time and processing time. Currently, in the DataStream API, users need to explicitly differentiate between Processing Time and Event Time in several places when expressing job logic.

When invoking methods like `DataStream.windowAll` or `KeyedDataStream.window`, users need to select the appropriate `WindowAssigner` based on processing time or event time.

  • org.apache.flink.streaming.api.windowing.assigners.WindowAssigner

    • TumblingEventTimeWindows vs TumblingProcessingTimeWindows

    • SlidingEventTimeWindows vs SlidingProcessingTimeWindows

    • DynamicEventTimeSessionWindows vs DynamicProcessingTimeSessionWindows

    • EventTimeSessionWindows vs ProcessingTimeSessionWindows

When implementing custom `ProcessFunction` or `KeyedProcessFunction`, users need to differentiate between registering a timer for processing time or event time using the `TimerService`.

  • org.apache.flink.streaming.api.TimerService

    • registerProcessingTimeTimer vs registerEventTimeTimer

Appendix - Update MySqlSource

Add a method `useProcessingTimeDuringBinlog` to `MySqlSourceBuilder` to specify that the `MySqlSource` should send a watermark with `useProcessingTime` set to true at the beginning of the binlog phrase.

Code Block
languagejava
@PublicEvolving
public class MySqlSourceBuilder<T> {
    ...
  
    /** Use processing time at during the binlog phrase. */
    public MySqlSourceBuilder<T> useProcessingTimeDuringBinlog() {
    ...
    }
}

Note that user can only use `NoWatermarkGenerator` when MySqlSource use processing time during binlog. Otherwise, an exception is thrown.

Update MySqlSource to overwrite the `getWatermarkCharacteristic` method to return `ANY_WATERMARK`