Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This document proposes a new API and mechanism at the DataStream, allowing the Source (e.g., MySQL CDC Source) to send a watermark that allows that downstream operator to advance its event time base on the system time. In the temporal join operator, the probe side can wait for the build side until the build side receives the watermark to start advancing its it's event time base on system time.

...

Code Block
languagejava
titleTimestampAssigner
public interface TimestampAssigner<T> {
    long extractTimestamp(T element, long recordTimestamp);
}

WatermarkGenerator: Generates watermarks either based on events or periodically. Here is the WatermarkGenerator interface Generates watermarks either based on events or periodically. Here is the WatermarkGenerator interface:

Code Block
languagejava
titleWatermarkGenerator
@Public
public interface WatermarkGenerator<T> {
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    void onPeriodicEmit(WatermarkOutput output);
}

...

  • When creating a source, user provides a `WatermarkStrategy` to `StreamExecutionEnvironment#fromSource`.
  • If the the source supports event time natively (e.g., KafkaSource) or the user provides a custom `TimestampAssigner` in the `WatermarkStrategy` to extract the timestamp from the record, Flink will add the timestamp to the record. Otherwise, the timestamp on the record will be to extract the timestamp from the record, Flink will add the timestamp to the record. Otherwise, the timestamp on the record will be `Long.MIN_VALUE`.
  • If the user uses `NoWatermarkGenerator` in the `WatermarkStrategy`, the job will not generate watermarks. Otherwise, the job will periodically emit watermarks, and the watermark value depends on event time. The output frequency of watermarks is determined by `pipeline.auto-watermark-interval`,  with a default value of 200mswith a default value of 200ms.


Public Interfaces

1) Modify `org.apache.flink.api.common.eventtime.Watermark` and `org.apache.flink.streaming.api.watermark.Watermark` by adding a new field `useProcessingTime`, with a default value of false.

...

Please note that the update do not change the abstraction of the information passed by the `Watermark` class but rather provide a natural extension to its current capabilities. When `useProcessingTime` is false, the watermark indicates that the stream has progressed to a specific event time. When `useProcessingTime` is true, it indicates that the stream progresses in synchronization with the system time. In both case, the watermark indicates the time progression in the data streamis true, it indicates that the stream progresses in synchronization with the system time. In both case, the watermark indicates the time progression in the data stream.


2) Update `AbstractStreamOperator/AbstractStreamOperatorV2` to support processing watermarks with `useProcessingTime` set to true:

...

After modifying the operator and the operator receive a watermark with `useProcessingTime` set to true, the behavior will be the same as processing time operators set to true, the behavior will be the same as processing time operators.


3) Introduce `WatermarkCharacteristic` enum class

...

The updated `WatermarkToDataOutput` ensures that when `useProcessingTime` is false is false,  the watermark's timestamp is monotonically increasing. It also ensures that information with the watermark's timestamp is monotonically increasing. It also ensures that information with `useProcessingTime` set to true is sent downstream and that the `useProcessingTime` will not set back to false once it is set to true.

...

The `StatusWatermarkValve` is used to calculate the current watermark for an input that has multiple input channels and invoke the `processWatermark` method of the operator. `StatusWatermarkValve` ensures that the watermark timestamp passed to the `processWatermark` method is the minimum timestamp among all the input channels of that input, and it should be monotonically increasing.

Original behavior:

  • It keeps track of the maximum watermark timestamp seen and the statusIt keeps track of the maximum watermark timestamp seen and the status(active/idle,  alignedaligned/unaligned)  for each input channelfor each input channel.

    • Each input channel can be either active or idle. `StatusWatermarkValve` updates the status of each input channel when it receive `WatermarkStatus` from the upstream from the upstream.

    • Each input channel can be either aligned or unaligned. An input channel is only consider aligned when it is active and its watermark is greater than or equals to the last watermark timestamp of the input.

  • It gets the minimum watermark timestamp among all the aligned input channels and uses it as the watermark for the input. If the new watermark timestamp is greater than the previous watermark timestamp, it invokes the `processWatermark` method.

...

The `IndexedCombinedWatermarkStatus` represents combined value and status of a watermark for a set number of input partial watermarks. Operator advances the event time of `timeServiceManager` base on the combined watermark. `IndexedCombinedWatermarkStatus` ensures that the watermark timestamp of the operator that has multiple inputs is the minimum timestamp among all the inputs, and it should be monotonically increasing. `IndexedCombinedWatermarkStatus` ensures that the watermark timestamp of the operator that has multiple inputs is the minimum timestamp among all the inputs, and it should be monotonically increasing.

Original behavior:

  • It keeps track of the maximum watermark timestamp seen and the statusIt keeps track of the maximum watermark timestamp seen and the status(active/idle)  for each inputfor each input.

    • Each channel can be either active or idle. `IndexedCombinedWatermarkStatus` updates the status of the input when it receive `WatermarkStatus` from that input.

  • It gets the minimum watermark timestamp among all the active input and uses it as the watermark of the operator. If the new watermark timestamp is greater than the previous watermark timestamp, it advanced the event time of `timeServiceManager`.

...

  • It keeps track of the maximum watermark timestamp seen, the status(active/idle), and the `useProcessingTime` for each input.

    • Each channel can be either active or idle. `IndexedCombinedWatermarkStatus` updates the status of the input when it receive `WatermarkStatus` from that input. If `useProcessingTime` of an input is set to true, it is always active.

  • If not all the active inputs have `useProcessingTime` set to true, the event time of the operator is the minimum watermark timestamp among all the active inputs. If all the active inputs have `useProcessingTime` set to true, the event time of the operator is in synchronization with the system timeset to true, the event time of the operator is in synchronization with the system time.


TemporalRowTimeJoinOperator

...

The modifications made to the existing Watermark functionality are backward compatible. This is because we only add new APIs, which do not cause existing code handling Watermarks to fail. Additionally, the default setting for the `useProcessingTime` parameter in Watermark instances is false, preserving the existing semantics parameter in Watermark instances is false, preserving the existing semantics.

With the updates to `AbstractStreamOperator/AbstractStreamOperatorV2` based on the FLIP, all operators can support Watermarks with `useProcessingTime`. And correctly trigger the operator's timer based on event time or the system time. For sources that haven't been updated, the Watermarks they send always have `useProcessingTime` set to false. In this case, the behavior of the operators remains unchanged, ensuring compatibility with existing jobs.

...

  • org.apache.flink.streaming.api.windowing.assigners.WindowAssigner

    • TumblingEventTimeWindows vs TumblingProcessingTimeWindows

    • SlidingEventTimeWindows vs SlidingProcessingTimeWindows

    • DynamicEventTimeSessionWindows vs DynamicProcessingTimeSessionWindows

    • TumblingEventTimeWindows vs TumblingProcessingTimeWindows

    • SlidingEventTimeWindows vs SlidingProcessingTimeWindows

    • DynamicEventTimeSessionWindows vs DynamicProcessingTimeSessionWindows

    • EventTimeSessionWindows vs ProcessingTimeSessionWindows

...