Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between Xuannan Su  and Dong Lin  ]


Table of Contents

Motivation

Assuming the user needs to perform a processing-time temporal join where the Probe Side records are obtained from a Kafka Source and the Build Side records are obtained from a MySQL CDC Source, which consists of a snapshot reading phase followed by a binlog reading phase. Notably, all input records lack event-time information. The user's requirement is that each record on the Probe Side must be joined with at least the records from the Build Side's snapshot phase. In other words, the Join operator needs to wait for the completion of the Build Side's snapshot phase before processing the Probe Side's data.

...

This document proposes the introduction of APIs that allow source operators (e.g., HybridSource, MySQL CDC Source) to send watermarks to downstream operators, indicating that the watermark should start increasing according to the system time. In addition to supporting processing-time temporal joins, this FLIP provides the fundation to simplifies DataStream APIs such as KeyedStream#window(...), such that users would no longer need to explicitly differentiate between TumblingEventTimeWindows and TumblingProcessingTimeWindows, leading to a more intuitive experience.

Terminology and Background

The FLIP proposes changes to Flink's watermark and timestamp concepts. To better understand the underlying design, let's recap the relevant concepts in this section.

...

  • When creating a source, the user provides a WatermarkStrategy to StreamExecutionEnvironment#fromSource.
  • If the source natively supports event time (e.g., KafkaSource) or the user provides a custom TimestampAssigner in the WatermarkStrategy to extract the timestamp from the record, Flink will add the timestamp to the record. Otherwise, the timestamp on the record will be set to Long.MIN_VALUE.
  • If the user employs NoWatermarkGenerator in the WatermarkStrategy, the job will not generate watermarks. Otherwise, the job will periodically emit watermarks, and the watermark value depends on event time. The frequency of watermark emission is determined by pipeline.auto-watermark-interval, with a default value of 200ms.

Public Interfaces

1) Add an useProcessingTime field to org.apache.flink.api.common.eventtime.Watermark and org.apache.flink.streaming.api.watermark.Watermark.

...

Code Block
languagejava
titleWatermarkGenerator
@Public
public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable, CheckpointListener {   
    ...

    /**
     * Provide SourceReader with a runnable that can be used to emit watermark.
     *
     * If SourceReader wants to own the responsibility of invoking WatermarkGenerator#onPeriodicEmit,
     * it should override this method and return true. And SourceReader should respect
     * pipeline.auto-watermark-interval if it decides to emit watermark periodically.
     *
     * @return true iff the caller should avoid triggering WatermarkGenerator#onPeriodicEmit.
     */
    default boolean delegateWatermarkPeriodicEmit(Runnable onWatermarkEmit) {
        return false;
    }
}


Proposed Changes

1) Update SourceOperator behavior

...

By incorporating these updates, we ensure that the IndexedCombinedWatermarkStatus maintains the desired behavior of having the minimum watermark timestamp among all inputs while preventing any decrease in the watermark.


Example Usage

Here is the Flink SQL example that demonstrates how to perform processing time temporal join after the FLIP.

...

Code Block
languagesql
-- Create mysql cdc source table (dimension table)
CREATE TEMPORARY TABLE user_info (
    user_id INTEGER PRIMARY KEY NOT ENFORCED, 
    gender STRING
) WITH (
    'connector' = 'mysql-cdc',
    'database-name' = 'example_database',
    'hostname' = 'localhost',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'user_info'
);

-- Create datagen source table (fact table)
CREATE TEMPORARY TABLE click_event (
    user_id INTEGER,
    item_id INTEGER,
    proctime AS PROCTIME()
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.user_id.min' = '0',
    'fields.user_id.max' = '9'
);

-- Create a print sink table
CREATE TEMPORARY TABLE print_sink (
    user_id INTEGER,
    item_id INTEGER,
    gender STRING
) WITH (
    'connector' = 'print'
);

-- Processing time temporal join
INSERT INTO print_sink
SELECT click_event.user_id AS user_id, item_id, gender FROM click_event 
LEFT JOIN user_info FOR SYSTEM_TIME AS OF click_event.proctime
ON click_event.user_id = user_info.user_id;


Compatibility, Deprecation, and Migration Plan

The proposed change might be negative impact user experience in the following scenario:

...

  • We only introduce new APIs, which do not cause existing code handling Watermarks to fail. Moreover, the default setting for the useProcessingTime parameter in Watermark instances is false, preserving the existing semantics.
  • With the updates to AbstractStreamOperator/AbstractStreamOperatorV2 based on this FLIP, all operators can now support Watermarks with the useProcessingTime field and correctly trigger the operator's timer based on event time or system time. For sources that have not been updated, the Watermarks they send always have useProcessingTime set to false. In this case, the behavior of the operators remains unchanged, ensuring compatibility with existing jobs.

Test Plan

The change will be covered with unit and integration tests.

Future Work

After this FLIP, we can unify the API for processing time and event time. The following are some examples of the APIs pairs that distinguish between event time and processing time. Currently, in the DataStream API, users need to explicitly differentiate between Processing Time and Event Time in several places when expressing job logic.

...