Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1) Add the job-level config piplinepipeline.backlog.watermark-lag-threshold

...

Here is the specification of this config. This config will be marked Experimental.

  • Name: piplinepipeline.backlog.watermark-lag-threshold
  • Type: Duration
  • Default value: null
  • Description: The threshold of watermark lag for backlog detection. A source is considered to be processing backlog data if its watermark lags behind the current time by a value greater than the threshold. If it is null, source will not report its backlog status based on watermark lag. If it is If the configured value is not null, it must be greater than 0. Note that if there are multiple backlog statuses from different backlog strategies, they are combined using a logical OR operation. It is important to set the threshold according to the WatermarkStrategy, and a source would report isProcessingBacklog=true if its watermarkLag metric is defined and the metric value exceeds the configured value. Note that this config does not support switching source's isProcessingBacklog from false to true for now.

Note:

  • The config directly determines when the source's backlog status should be True. However, it does not directly determine when the backlog should be false. The source's backlog is set to false if there is NO strategy (e.g. FLIP-309, FLIP-328) that says the source's backlog should be set to True. In other words, if the configured value is null, or if the watermark lag is less than or equal to the configured value, source's isProcessingBacklog status will not be affected by this config.
  • The watermarkLag metric is proposed in FLIP-33.

Proposed Changes

  1. Update the SourceOperator to emit RecordAttribute
    1. When the SourceOperator emits a watermark to downstream operators, it checks the watermark lag and emitsRecordAttribute(backlog=true) event to the downstream operator. The watermark lag is calculated the same way as defined in FLIP-33, WatermarkLag = CurrentTime - Watermark. When the current watermark lags behind the current time by a value greater than the threshold. A RecordAttribute(backlog=false) is sent when the watermark lag is within the threshold.
    2. When the watermark status of the SourceOperator changes to idle, it emits the RecordAttribute(backlog=false) event to downstream operators. When the watermark status changes from idle to active, it will check the current watermark and emit RecordAttributeevent according to the watermark threshold.

...

The changes made in this FLIP are backward compatible. With the default value of piplinepipeline.backlog.watermark-lag-threshold, the source will not report its backlog status based on watermark lag, which is the current behavior. 


Future Work

Currently, FLIP-327 only supports switching from batch to stream mode. Therefore, the watermark-based backlog strategy in this FLIP can only switch from "isBacklog=true" to "isBacklog=false" but not the other way around. We plan to allow the source operator to switch from "isBacklog=false" to "isBacklog=true" based on watermark when Flink runtime supports switching from stream to batch mode in the future.


Rejected Alternatives

1. Specify the watermark lag threshold for each Source

...