Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • When the `WatermarkToDataOutput` receive a watermark, it checks the timestamp and `useProcessingTime` field of the watermark. It only send the watermark downstream if the timestamp of the watermark is greater than the timestamp of the most recently sent watermark or if the `useProcessingTime` field is set to true.

  • If the watermark to be sent has `useProcessingTime` set to true, and the current system time is lesser than the timestamp of the most recently sent watermark, an exception is thrownset to true, and the current system time is lesser than the timestamp of the most recently sent watermark, an exception is thrown.

  • If a watermark with `useProcessingTime` set to true has been previously sent, and the watermark to be sent has set to true has been previously sent, and the watermark to be sent has `useProcessingTime` set to false, an exception is thrown.

...

The `StatusWatermarkValve` is used to calculate the current watermark for an input that has multiple input channels and invoke the `processWatermark` method of the operator. `StatusWatermarkValve` ensures that the watermark timestamp passed to the `processWatermark` method is the minimum timestamp among all the input channels of that input, and it should be monotonically increasing is used to calculate the current watermark for an input that has multiple input channels and invoke the `processWatermark` method of the operator. `StatusWatermarkValve` ensures that the watermark timestamp passed to the `processWatermark` method is the minimum timestamp among all the input channels of that input, and it should be monotonically increasing.

Original behavior:

  • It keeps track of the maximum watermark timestamp seen and the status(active/idle, aligned/unaligned) for each input channel.

    • Each input channel can be either active or idle. `StatusWatermarkValve` updates the status of each input channel when it receive `WatermarkStatus` from the upstream.

    • Each input channel can be either aligned or unaligned. An input channel is only consider aligned when it is active and its watermark is greater than or equals to the last watermark timestamp of the input.

  • It gets the minimum watermark timestamp among all the aligned input channels and uses it as the watermark for the input. If the new watermark timestamp is greater than the previous watermark timestamp, it invokes the `processWatermark` method.

Updated Behavior:

  • It keeps track of the maximum watermark timestamp seen, the statusIt keeps track of the maximum watermark timestamp seen, the status(active/idle,  alignedaligned/unaligned),  and and useProcessingTime for each input channel.

    • Each input channel can be either active or idle. `StatusWatermarkValve` updates the status of each input channel when it receive updates the status of each input channel when it receive `WatermarkStatus` from the upstream. If the input channel has from the upstream. If the input channel has `useProcessingTime` set to true, then it is considered active.

  • Each input channel can be either aligned or unaligned. If the `useProcessingTime` of the last watermark of the input is set to false, an input channel is only consider aligned when it is active and its watermark is greater than or equals to the last watermark timestamp of the input. If the `useProcessingTime` of the last watermark of the input is set to true, an input channel is only consider aligned when it is active and its `useProcessingTime` is set to true.

  • If not all the aligned input channels have `useProcessingTime` set to true, the watermark of the input is the minimum watermark timestamp among all the aligned input channels. If all the aligned input channels have `useProcessingTime` set to true, the watermark of the input has `useProcessingTime` set to true.

After the update, we can still ensure that the watermark effective timestamp passed to the `processWatermark` method is the minimum among all input channels of that input, and it is monotonically increasing method is the minimum among all input channels of that input, and it is monotonically increasing.


The chart below shows the effective watermark of the input given the watermark of the two input channels.

...

The `IndexedCombinedWatermarkStatus` represents combined value and status of a watermark for a set number of input partial watermarks. Operator advances the event time of represents combined value and status of a watermark for a set number of input partial watermarks. Operator advances the event time of `timeServiceManager` base on the combined watermark. `IndexedCombinedWatermarkStatus` ensures that the watermark timestamp of the operator that has multiple inputs is the minimum timestamp among all the inputs, and it should be monotonically increasing.

...

  • If the temporal join is based on row time (event time), the `TemporalRowTimeJoinOperator` is used.

  • If the temporal join is based on processing time

    • If use the `TemporalFunctionJoin` syntax,  the the `TemporalProcessTimeJoinOperator` is used.

    • If not use the `TemporalFunctionJoin` syntax, an exception is thrown syntax, an exception is thrown.

Updated behavior:

  • If the temporal join is based on row time (event time), the `TemporalRowTimeJoinOperator` is used. 

  • If the temporal join is based on processing time

    • If any `WatermarkCharacteristic` of  the sources is `UNDEFINED`.

      • If use the `TemporalFunctionJoin` syntax, the `TemporalProcessTimeJoinOperator` is used.

      • If not use the `TemporalFunctionJoin` syntax, an exception is thrown.

    • Otherwise, use `TemporalRowTimeJoinOperator`.

...