Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The updated `WatermarkToDataOutput` ensures that when `useProcessingTime` is false, the watermark's timestamp is monotonically increasing. It also ensures that information with `useProcessingTime` set to true is sent downstream and that the `useProcessingTime` will not set back to false once it is set to true.


StatusWatermarkValve

The `StatusWatermarkValve` is used to calculate the current watermark for an input that has multiple input channels and invoke the `processWatermark` method of the operator. `StatusWatermarkValve` ensures that the watermark timestamp passed to the `processWatermark` method is the minimum timestamp among all the input channels of that input, and it should be monotonically increasing.

Original behavior:

  • It keeps track of the maximum watermark timestamp seen and the status(active/idle, aligned/unaligned) for each input channel.

    • Each input channel can be either active or idle. `StatusWatermarkValve` updates the status of each input channel when it receive `WatermarkStatus` from the upstream.

    • Each input channel can be either aligned or unaligned. An input channel is only consider aligned when it is active and its watermark is greater than or equals to the last watermark timestamp of the input.

  • It gets the minimum watermark timestamp among all the aligned input channels and uses it as the watermark for the input. If the new watermark timestamp is greater than the previous watermark timestamp, it invokes the `processWatermark` method.

Updated Behavior:

  • It keeps track of the maximum watermark timestamp seen, the status(active/idle, aligned/unaligned), and useProcessingTime for each input channel.

    • Each input channel can be either active or idle. `StatusWatermarkValve` updates the status of each input channel when it receive `WatermarkStatus` from the upstream. If the input channel has `useProcessingTime` set to true, then it is considered active.

  • Each input channel can be either aligned or unaligned. If the `useProcessingTime` of the last watermark of the input is set to false, an input channel is only consider aligned when it is active and its watermark is greater than or equals to the last watermark timestamp of the input. If the `useProcessingTime` of the last watermark of the input is set to true, an input channel is only consider aligned when it is active and its `useProcessingTime` is set to true.

  • If not all the aligned input channels have `useProcessingTime` set to true, the watermark of the input is the minimum watermark timestamp among all the aligned input channels. If all the aligned input channels have `useProcessingTime` set to true, the watermark of the input has `useProcessingTime` set to true.

After the update, we can still ensure that the watermark effective timestamp passed to the `processWatermark` method is the minimum among all input channels of that input, and it is monotonically increasing.


The chart below shows the effective watermark of the input given the watermark of the two input channels.

InputChannel1 \ InputChannel2

currentTimestamp = t2

useProcessingTime = true

currentTimestamp = t2

useProcessingTime = false

currentTimestamp = t1

useProcessingTime = true

Watermark(MIN(t1, t2), true)


Watermark(t2, false)


currentTimestamp = t1

useProcessingTime = false


Watermark(t1, false)

Watermark(MIN(t1, t2), false)


Compatibility, Deprecation, and Migration Plan

...