Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between Xuannan Su   and Dong Lin  ]

Motivation

Assuming the user needs to perform a temporal join, where the Probe Side comes from Kafka and the Build Side comes from a MySQL CDC Source (with a snapshot reading phase and a binlog reading phase), and all input data lacks event-time information. The user requires each data record on the Probe Side to be joined with at least the data from the Build Side's snapshot phase. In other words, the Join operator needs to wait for the Build Side's snapshot phase to finish reading its data before processing the Probe Side's data.

Assuming the user needs to perform a temporal join, where the Probe Side comes from Kafka and the Build Side comes from a MySQL CDC Source (with a snapshot reading phase and a binlog reading phase), and all input data lacks event-time information. The user requires each data record on the Probe Side to be joined with at least the data from the Build Side's snapshot phase. In other words, the Join operator needs to wait for the Build Side's snapshot phase to finish reading its data before processing the Probe Side's data.

Currently, Flink cannot support the use case above. Flink SQL does not support the usage of "SYSTEM_TIME AS OF syntax used in temporal join with the latest version of any view/table." This is because the TemporalProcessTimeJoinOperator supports temporal joins based on processing time but does not support the Probe Side waiting for data from the Build Side in processing time mode. The operator may start processing the Probe Side's data before reading the data from the Build Side's snapshot phase, which can lead to situations where the Probe Side's data cannot be joined with any data, resulting in output that does not meet the user's requirements. For more details, you can refer to Currently, Flink cannot support the use case above. Flink SQL does not support the usage of "SYSTEM_TIME AS OF syntax used in temporal join with the latest version of any view/table." This is because the TemporalProcessTimeJoinOperator supports temporal joins based on processing time but does not support the Probe Side waiting for data from the Build Side in processing time mode. The operator may start processing the Probe Side's data before reading the data from the Build Side's snapshot phase, which can lead to situations where the Probe Side's data cannot be joined with any data, resulting in output that does not meet the user's requirements. For more details, you can refer to FLINK-19830.

This document proposes a new API and mechanism at the DataStream, allowing the Source (e.g., MySQL CDC Source) to send a watermark that allows that downstream operator to advance its event time base on the system time. In the temporal join operator, the probe side can wait for the build side until the build side receives the watermark to start advancing its event time base on system time.

...