Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread-https://lists.apache.org/thread/6m47ggxn9sl0qgfgyq20ytyzlw42crxj
Vote thread-
JIRA

-

Release-

...

[This FLIP proposal is a joint work between Xuannan Su  and Dong Lin  ]


Table of Contents

Motivation

Assuming the user needs to perform a processing-time temporal join where the Probe Side records are obtained from a Kafka Source and the Build Side records are obtained from a MySQL CDC Source, which consists of a snapshot reading phase followed by a binlog reading phase. Notably, all input records lack event-time information. The user's requirement is that each record on the Probe Side must be joined with at least the records from the Build Side's snapshot phase. In other words, the Join operator needs to wait for the completion of the Build Side's snapshot phase before processing the Probe Side's data.

...

This document proposes the introduction of APIs that allow source operators (e.g., HybridSource, MySQL CDC Source) to send watermarks to downstream operators, indicating that the watermark should start increasing according to the system time. In addition to supporting processing-time temporal joins, this FLIP provides the fundation to simplifies DataStream APIs such as KeyedStream#window(...), such that users would no longer need to explicitly differentiate between TumblingEventTimeWindows and TumblingProcessingTimeWindows, leading to a more intuitive experience.

This document proposes the introduction of APIs that enable source operators (e.g., HybridSource, MySQL CDC Source) to send watermarks to downstream operators, signaling the start of increasing watermarks based on the system time. Apart from supporting processing-time temporal joins, this FLIP lays the groundwork for simplifying DataStream APIs, including KeyedStream#window(...), so that users would no longer be required to explicitly distinguish between TumblingEventTimeWindows and TumblingProcessingTimeWindows, resulting in a more intuitive user experience.

Terminology and Background

The FLIP proposes changes to Flink's watermark and timestamp concepts. To better understand the underlying design, let's recap the relevant concepts in this section.

Terminology and Background

The FLIP proposes changes to Flink's watermark and timestamp concepts. To better understand the underlying design, let's recap the relevant concepts in this section.

Probe Side: Refers to the left side of the stream in a temporal join, also known as the Fact Table. Typically, data on the Probe Side doesn't need to be retained after processing.

Build Side: Represents the right Probe Side: Refers to the left side of the stream in a temporal join, also known often referred to as the Fact Table. Typically, data on the Probe Side doesn't need to be retained after processing.Build Side: Represents the right side of the stream in a temporal join, often referred to as the Dimension Table. It can be a versioned table Dimension Table. It can be a versioned table and typically contains the latest data for each key.

...

  • When creating a source, the user provides a WatermarkStrategy to StreamExecutionEnvironment#fromSource.
  • If the source natively supports event time (e.g., KafkaSource) or the user provides a custom TimestampAssigner in the WatermarkStrategy to extract the timestamp from the record, Flink will add the timestamp to the record. Otherwise, the timestamp on the record will be set to Long.MIN_VALUE.
  • If the user employs NoWatermarkGenerator in the WatermarkStrategy, the job will not generate watermarks. Otherwise, the job will periodically emit watermarks, and the watermark value depends on event time. The frequency of watermark emission is determined by pipeline.auto-watermark-interval, with a default value of 200ms.

Public Interfaces

1) Add an useProcessingTime field to org.apache.flink.api.common.eventtime.Watermark and org.apache.flink.streaming.api.watermark.Watermark.

...

Code Block
languagejava
titleWatermarkGenerator
@Public
public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable, CheckpointListener {   
    ...

    /**
     * Provide SourceReader with a runnable that can be used to emit watermark. SourceReader
     *
 should override this method and* returnIf trueSourceReader if it wants to own the responsibility
     * of invoking WatermarkGenerator#onPeriodicEmit.,
     *
 it should override this method and return true. And SourceReader should respect
     * pipeline.auto-watermark-interval if it decides to emit watermark periodically.
     *
     * @return true iff the caller should avoid triggering WatermarkGenerator#onPeriodicEmit.
     */
    default boolean delegateWatermarkPeriodicEmit(Runnable onWatermarkEmit) {
        return false;
    }
}


Proposed Changes

1) Update SourceOperator behavior

...

And we can remove TemporalProcessTimeJoinOperator because its functionality is covered by  TemporalRowTimeJoinOperator.

TODO: check whether it is correct when TemporalJoinFunction syntax is used.


5) Update WatermarkToDataOutput to handle 5) Update WatermarkToDataOutput to handle Watermark#useProcessingTime.

...

By incorporating these updates, we ensure that the IndexedCombinedWatermarkStatus maintains the desired behavior of having the minimum watermark timestamp among all inputs while preventing any decrease in the watermark.


Example Usage

Here is the Flink SQL example that demonstrates how to perform processing time temporal join after the FLIP.

...

Code Block
languagesql
-- Create mysql cdc source table (dimension table)
CREATE TEMPORARY TABLE user_info (
    user_id INTEGER PRIMARY KEY NOT ENFORCED, 
    gender STRING
) WITH (
    'connector' = 'mysql-cdc',
    'database-name' = 'example_database',
    'hostname' = 'localhost',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'user_info'
);

-- Create datagen source table (fact table)
CREATE TEMPORARY TABLE click_event (
    user_id INTEGER,
    item_id INTEGER,
    proctime AS PROCTIME()
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.user_id.min' = '0',
    'fields.user_id.max' = '9'
);

-- Create a print sink table
CREATE TEMPORARY TABLE print_sink (
    user_id INTEGER,
    item_id INTEGER,
    gender STRING
) WITH (
    'connector' = 'print'
);

-- Processing time temporal join
INSERT INTO print_sink
SELECT click_event.user_id AS user_id, item_id, gender FROM click_event 
LEFT JOIN user_info FOR SYSTEM_TIME AS OF click_event.proctime
ON click_event.user_id = user_info.user_id;


Compatibility, Deprecation, and Migration Plan

The proposed change might be negative impact user experience in the following scenario:

...

  • User writes a Flink SQL program with a temporal join in processing time that does not use the TemporalFunctionJoin syntax,
  • The source on the Build Side employs a bounded + unbounded phase internally (e.g., HybridSource, MySQL CDC) but has not been updated to overwrite SourceReader#delegateWatermarkPeriodicEmit as specified in this FLIP
  • User prefers to have the job fail fast rather than producing results where the probe-side records fail to join with the records from the bounded phase of the build-side source.

...

  • We only introduce new APIs, which do not cause existing code handling Watermarks to fail. Moreover, the default setting for the useProcessingTime parameter in Watermark instances is false, preserving the existing semantics.
  • With the updates to AbstractStreamOperator/AbstractStreamOperatorV2 based on this FLIP, all operators can now support Watermarks with the useProcessingTime field and correctly trigger the operator's timer based on event time or system time. For sources that have not been updated, the Watermarks they send always have useProcessingTime set to false. In this case, the behavior of the operators remains unchanged, ensuring compatibility with existing jobs.

Test Plan

The change will be covered with unit and integration tests.

Future Work

After this FLIP, we can unify the API for processing time and event time. The following are some examples of the APIs pairs that distinguish between event time and processing time. Currently, in the DataStream API, users need to explicitly differentiate between Processing Time and Event Time in several places when expressing job logic.

...