Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).



Motivation

In certain use-cases, a user may start a Flink job to read from a Kafka topic, beginning from an offset corresponding to data from 5 days ago. And user requires that records from 5 days ago are processed with high throughput to quickly backfill the result for the last 5 days and continue processing the real-time data in a timely fashion. 

FLIP-309 introduces the concept of processing backlog. When a record emitted by a source contains the attribute isBackLog=true, downstream operators can optimize for throughput over latency. 

In this FLIP, we propose to enable source operators to report whether they are processing backlog based on the watermark lag. 


Public Interfaces

1) Add the job-level config pipeline.backlog.watermark-lag-threshold


Here is the specification of this config. This config will be marked Experimental.

  • Name: pipeline.backlog.watermark-lag-threshold
  • Type: Duration
  • Default value: null
  • Description: If the configured value is not null, it must be greater than 0, and a source would report isProcessingBacklog=true if its watermarkLag metric is defined and the metric value exceeds the configured value. Note that this config does not support switching source's isProcessingBacklog from false to true for now.

Note:

  • The config directly determines when the source's backlog status should be True. However, it does not directly determine when the backlog should be false. The source's backlog is set to false if there is NO strategy (e.g. FLIP-309, FLIP-328) that says the source's backlog should be set to True. In other words, if the configured value is null, or if the watermark lag is less than or equal to the configured value, source's isProcessingBacklog status will not be affected by this config.
  • The watermarkLag metric is proposed in FLIP-33.

Proposed Changes

  1. Update the SourceOperator to emit RecordAttribute
    1. When the SourceOperator emits a watermark to downstream operators, it checks the watermark lag and emitsRecordAttribute(backlog=true) event to the downstream operator. The watermark lag is calculated the same way as defined in FLIP-33, WatermarkLag = CurrentTime - Watermark. When the current watermark lags behind the current time by a value greater than the threshold. A RecordAttribute(backlog=false) is sent when the watermark lag is within the threshold.
    2. When the watermark status of the SourceOperator changes to idle, it emits the RecordAttribute(backlog=false) event to downstream operators. When the watermark status changes from idle to active, it will check the current watermark and emit RecordAttributeevent according to the watermark threshold.


Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. With the default value of pipeline.backlog.watermark-lag-threshold, the source will not report its backlog status based on watermark lag, which is the current behavior. 


Future Work

Currently, FLIP-327 only supports switching from batch to stream mode. Therefore, the watermark-based backlog strategy in this FLIP can only switch from "isBacklog=true" to "isBacklog=false" but not the other way around. We plan to allow the source operator to switch from "isBacklog=false" to "isBacklog=true" based on watermark when Flink runtime supports switching from stream to batch mode in the future.


Rejected Alternatives

1. Specify the watermark lag threshold for each Source

  • Add getProcessingBacklogEventTimeLagThreshold method to the SourceReader interface.
    @Public
    public interface SourceReader<T, SplitT extends SourceSplit>
            extends AutoCloseable, CheckpointListener {
    
        /**
         * Returns the threshold in milliseconds for backlog detection. The  source is considered to
         * be processing backlog if its watermark lags behind the current time by a value greater than
         * the threshold.
         *
         * <p>The method returns {@code Long.MAX_VALUE} by default, which indicates that the source is never considered to be
         * processing backlog.
         *
         * @return The threshold in milliseconds of watermark lag for backlog detection. 
         */
        default long getProcessingBacklogWatermarkLagThresholdMs() {
            return Long.MAX_VALUE;
        }
    }
  • Update KafkaSource API to allow users to specify the threshold. 
    @PublicEvolving
    public class KafkaSourceBuilder<OUT> {
        ...
        
    
        KafkaSourceBuilder() {
          ...
          this.processingBacklogWatermarkLagThresholdMs = Long.MAX_VALUE;
        }
    
        /**
         * Sets threshold in milliseconds for backlog detection. The Kafka source is considered to
         * be processing backlog if its watermark lags behind the current time by a value greater than the threshold.
         */
        public KafkaSourceBuilder<OUT> setProcessingBacklogWatermarkLagThresholdMs(long processingBacklogWatermarkLagThresholdMs) {
          ...
        }
    }


2. Use two thresholds or backoff period to avoid backlog status switching processing back and forth

The scenario of backlog status switching back and forth happens when the throughput of the job when isBacklog = false cannot keep up with the data source. And the throughput of the job when isBacklog = true, which is optimized for throughput, can keep up with the data source. 

If the throughput when isBacklog = false cannot keep up with the data source and the job never switches to isBacklog = true, the watermark lag will keep increasing. Introducing the two thresholds method or backoff period can only mitigate the problem. And the real problem is that the throughput of the real-time processing cannot keep up with the data source.