You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state

Discussion thread:

JIRA:

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


[This FLIP proposal is a joint work between Xuannan Su   and Dong Lin  ]

Motivation

Assuming the user needs to perform a temporal join, where the Probe Side comes from Kafka and the Build Side comes from a MySQL CDC Source (with a snapshot reading phase and a binlog reading phase), and all input data lacks event-time information. The user requires each data record on the Probe Side to be joined with at least the data from the Build Side's snapshot phase. In other words, the Join operator needs to wait for the Build Side's snapshot phase to finish reading its data before processing the Probe Side's data.

Currently, Flink cannot support the use case above. Flink SQL does not support the usage of "SYSTEM_TIME AS OF syntax used in temporal join with the latest version of any view/table." This is because the TemporalProcessTimeJoinOperator supports temporal joins based on processing time but does not support the Probe Side waiting for data from the Build Side in processing time mode. The operator may start processing the Probe Side's data before reading the data from the Build Side's snapshot phase, which can lead to situations where the Probe Side's data cannot be joined with any data, resulting in output that does not meet the user's requirements. For more details, you can refer to FLINK-19830.

This document proposes a new API and mechanism at the DataStream, allowing the Source (e.g., MySQL CDC Source) to send a watermark that allows that downstream operator to advance its event time base on the system time. In the temporal join operator, the probe side can wait for the build side until the build side receives the watermark to start advancing its event time base on system time.


Terminology and Background

The FLIP will make changes to Flink's watermark and timestamp concepts. To help understand the intuition behind the design, we will introduce the relevant concepts in this section.

Probe Side: The left side of the stream in a temporal join, sometimes referred to as the Fact Table. Usually, the data on the Probe Side doesn't need to be retained after processing.

Build Side: The right side of the stream in a temporal join, which can be a versioned table. It is also known as the Dimension Table. Typically, the Build Side has the latest data for each key.

Watermark: Inform operators that no elements with a timestamp older than or equal to the watermark timestamp should arrive at the operator.

TimestampAssigner: Assigns event time timestamps to elements. These timestamps are used by all functions that operate on event time, such as event time windows. Here is the `TimestampAssigner` interface:

TimestampAssigner
public interface TimestampAssigner<T> {
    long extractTimestamp(T element, long recordTimestamp);
}

WatermarkGenerator: Generates watermarks either based on events or periodically. Here is the WatermarkGenerator interface:

WatermarkGenerator
@Public
public interface WatermarkGenerator<T> {
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    void onPeriodicEmit(WatermarkOutput output);
}


For Flink DataStream jobs:

  • When creating a source, user provides a `WatermarkStrategy` to `StreamExecutionEnvironment#fromSource`.
  • If the the source supports event time natively (e.g., KafkaSource) or the user provides a custom `TimestampAssigner` in the `WatermarkStrategy` to extract the timestamp from the record, Flink will add the timestamp to the record. Otherwise, the timestamp on the record will be `Long.MIN_VALUE`.
  • If the user uses `NoWatermarkGenerator` in the `WatermarkStrategy`, the job will not generate watermarks. Otherwise, the job will periodically emit watermarks, and the watermark value depends on event time. The output frequency of watermarks is determined by `pipeline.auto-watermark-interval`, with a default value of 200ms.


Public Interfaces

1) Modify `org.apache.flink.api.common.eventtime.Watermark` and `org.apache.flink.streaming.api.watermark.Watermark `by adding a new field `useProcessingTime`, with a default value of false.

Watermark
/**
 * Watermarks are the progress indicators in the data streams. A watermark signifies that no events
 * with a timestamp smaller than or equal to the watermark's time will occur after the watermark.
 *
 * <ul>
 *   <li>A watermark with a timestamp <i>T</i> and useProcessingTime set to false indicates that the
 *       event time of the stream has progressed to time <i>T</i>.
 *   <li>A watermark with a timestamp <i>T</i> and useProcessingTime set to true indicates that the
 *       event time of the stream progresses in synchronization with the system time. The timestamp
 *       <i>T</> must be less than the current system time. Otherwise, an exception will be thrown.
 * </ul>
 *
 * <p>Watermarks are created at the sources and propagate through the streams and operators.
 *
 * <p>In some cases a watermark is only a heuristic, meaning some events with a lower timestamp may
 * still follow. In that case, it is up to the logic of the operators to decide what to do with the
 * "late events". Operators can for example ignore these late events, route them to a different
 * stream, or send update to their previously emitted results.
 *
 * <p>When a source reaches the end of the input, it emits a final watermark with timestamp {@code
 * Long.MAX_VALUE}, indicating the "end of time".
 *
 * <p>Note: A stream's time starts with a watermark of {@code Long.MIN_VALUE}. That means that all
 * records in the stream with a timestamp of {@code Long.MIN_VALUE} are immediately late.
 *
 * <p>Note: After sending a watermark with useProcessingTime set to true, it is only allowed to send
 * subsequent watermarks with useProcessingTime set to true. Sending a watermark with
 * useProcessingTime set to false will result in an exception.
 */
@Public
public final class Watermark implements Serializable {
  	...

    /**
     * If useProcessingTime set to false, this is the time of the watermark in milliseconds. If
     * useProcessingTime set to true, this is the last effective time of the watermark in
     * milliseconds.
     */
    private final long timestamp;

  
    /**
     * If this is true, this watermark indicates the event time of the stream progresses in
     * synchronization with the system time.
     */
    private final boolean useProcessingTime;

    public Watermark(long timestamp) {
      this(timestamp, false);
    }

    public Watermark(long timestamp, boolean useProcessingTime) {
        this.timestamp = timestamp;
        this.useProcessingTime = useProcessingTime;
    }

    /** Returns whether the time of watermark can be determined by the system time. */
    public boolean useProcessingTime() {
        return useProcessingTime;
    }
}

Please note that the update do not change the abstraction of the information passed by the `Watermark` class but rather provide a natural extension to its current capabilities. When `useProcessingTime` is false, the watermark indicates that the stream has progressed to a specific event time. When `useProcessingTime` is true, it indicates that the stream progresses in synchronization with the system time. In both case, the watermark indicates the time progression in the data stream.


2) Update `AbstractStreamOperator/AbstractStreamOperatorV2` to support processing watermarks with `useProcessingTime` set to true:

Original Behavior:

  • `AbstractStreamOperator` and `AbstractStreamOperatorV2` manage and trigger event time and processing time timers through `InternalTimeServiceManager`.

  • Only when the operator receives a watermark, it calls `InternalTimeServiceManager#advanceWatermak` to advance the event time and trigger event time timers.

Updated Behavior:

  • When receiving a watermark with `useProcessingTime` set to false, the behavior remains the same as before.

  • When receiving a watermark with `useProcessingTime` set to true, the operator starts the `ScheduledThreadPoolExecutor` that trigger the event time timers as the system time progresses.

After modifying the operator and the operator receive a watermark with `useProcessingTime` set to true, the behavior will be the same as processing time operators.


3) Introduce `WatermarkCharacteristic` enum class

WatermarkCharacteristic
/**
 * WatermarkCharacteristic is used by the source implementation to describe the characteristic of
 * the watermark base on the WatermarkStrategy.
 */
@PublicEvolving
public enum WatermarkCharacteristic {

    /**
     * This implies that only watermark with useProcessingTime set to false can be sent and no
     * watermark will be sent in case of processing time. This is the default for all the source
     * implementations.
     */
    UNDEFINED,

    /**
     * This implies that source will send watermark with useProcessingTime set to false in case of
     * event time and with useProcessingTime set to true in case of processing time is required.
     */
    ANY_WATERMARK,
}


4)
Update `Source` interface, introducing `getWatermarkCharacteristic` method

Source
@Public
public interface Source<T, SplitT extends SourceSplit, EnumChkT>
        extends SourceReaderFactory<T, SplitT> {

    /**
     * Get the watermark characteristic of this source.
     *
     * @param watermarkStrategy The watermark strategy of the source.
     * @return the watermark characteristic of this source.
     */
    default WatermarkCharacteristic getWatermarkCharacteristic(
            WatermarkStrategy<?> watermarkStrategy) {
        return WatermarkCharacteristic.UNDEFINED;
    }
}


5) Update Source Implementations

KafkaSource

  • Update the `KafkaSourceReader` to emit watermark with `useProcessingTime` set to true at the beginning, if `NoWatermarkGenerator` is used. 

  • Update KafkaSource to overwrite the `getWatermarkCharacteristic` method to returns `ANY_WATERMAK`.

MySqlSource

Update to the `MySqlSource` is documented in the appendix.


Proposed Changes

WatermarkToDataOutput

`WatermarkToDataOutput` is responsible for sending the watermark from the `SourceReader` downstream and ensuring that the watermark timestamps sent are monotonically increasing.

Original Behavior:

  • When the `WatermarkToDataOutput` receive a watermark, it checks the timestamp of the watermark. It only send the watermark downstream if the timestamp of the watermark is greater than the timestamp of the most recently sent watermark.

Updated Behavior:

  • When the `WatermarkToDataOutput` receive a watermark, it checks the timestamp and `useProcessingTime` field of the watermark. It only send the watermark downstream if the timestamp of the watermark is greater than the timestamp of the most recently sent watermark or if the `useProcessingTime` field is set to true.

  • If the watermark to be sent has `useProcessingTime` set to true, and the current system time is lesser than the timestamp of the most recently sent watermark, an exception is thrown.

  • If a watermark with `useProcessingTime` set to true has been previously sent, and the watermark to be sent has `useProcessingTime` set to false, an exception is thrown.

The updated `WatermarkToDataOutput` ensures that when `useProcessingTime` is false, the watermark's timestamp is monotonically increasing. It also ensures that information with `useProcessingTime` set to true is sent downstream and that the `useProcessingTime` will not set back to false once it is set to true.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels