You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).



Motivation

In certain use-cases, a user may start a Flink job to read from a Kafka topic, beginning from an offset corresponding to data from 5 days ago. And user requires that records from 5 days ago are processed with high throughput to quickly backfill the result for the last 5 days and continue processing the real-time data in a timely fashion. 

FLIP-309 introduces the concept of processing backlog. When a record emitted by a source contains the attribute isBackLog=true, downstream operators can optimize for throughput over latency. 

In this FLIP, we propose to enable source operators to report whether they are processing backlog based on the watermark lag. 


Public Interfaces

1) Add the job-level config pipline.backlog.watermark-lag-threshold


Here is the specification of this config. This config will be marked Experimental.

  • Name: pipline.backlog.watermark-lag-threshold
  • Type: Duration
  • Default value: null
  • Description: The threshold of watermark lag for backlog detection. A source is considered to be processing backlog data if its watermark lags behind the current time by a value greater than the threshold. Once the source is marked as not processing backlog, the status persists even if the watermark lag surpasses the threshold subsequently. If it is null, source will not report its backlog status based on watermark lag. If it is not null, it must be greater than 0. Note that if there are multiple backlog statuses from different backlog strategies, they are combined using a logical OR operation. It is important to set the threshold according to the WatermarkStrategy.


Proposed Changes

  1. Update the SourceOperator to emit RecordAttribute
    1. When the SourceOperator emits a watermark to downstream operators, it checks the watermark lag and emitsRecordAttribute(backlog=true) event to the downstream operator. The watermark lag is calculated the same way as defined in FLIP-33, WatermarkLag = CurrentTime - Watermark. When the current watermark lags behind the current time by a value greater than the threshold. A RecordAttribute(backlog=false) is sent when the watermark lag is within the threshold.
    2. When the watermark status of the SourceOperator changes to idle, it emits the RecordAttribute(backlog=false) event to downstream operators. When the watermark status changes from idle to active, it will check the current watermark and emit RecordAttributeevent according to the watermark threshold.


Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. With the default value of pipline.backlog.watermark-lag-threshold, the source will not report its backlog status based on watermark lag, which is the current behavior. 


Future Work

Currently, FLIP-327 only supports switching from batch to stream mode. Therefore, the watermark-based backlog strategy in this FLIP can only switch from "isBacklog=true" to "isBacklog=false" but not the other way around. We plan to allow the source operator to switch from "isBacklog=false" to "isBacklog=true" based on watermark when Flink runtime supports switching from stream to batch mode in the future.


Rejected Alternatives

1. Specify the watermark lag threshold for each Source

  • Add getProcessingBacklogEventTimeLagThreshold method to the SourceReader interface.
    @Public
    public interface SourceReader<T, SplitT extends SourceSplit>
            extends AutoCloseable, CheckpointListener {
    
        /**
         * Returns the threshold in milliseconds for backlog detection. The  source is considered to
         * be processing backlog if its watermark lags behind the current time by a value greater than
         * the threshold.
         *
         * <p>The method returns {@code Long.MAX_VALUE} by default, which indicates that the source is never considered to be
         * processing backlog.
         *
         * @return The threshold in milliseconds of watermark lag for backlog detection. 
         */
        default long getProcessingBacklogWatermarkLagThresholdMs() {
            return Long.MAX_VALUE;
        }
    }
  • Update KafkaSource API to allow users to specify the threshold. 
    @PublicEvolving
    public class KafkaSourceBuilder<OUT> {
        ...
        
    
        KafkaSourceBuilder() {
          ...
          this.processingBacklogWatermarkLagThresholdMs = Long.MAX_VALUE;
        }
    
        /**
         * Sets threshold in milliseconds for backlog detection. The Kafka source is considered to
         * be processing backlog if its watermark lags behind the current time by a value greater than the threshold.
         */
        public KafkaSourceBuilder<OUT> setProcessingBacklogWatermarkLagThresholdMs(long processingBacklogWatermarkLagThresholdMs) {
          ...
        }
    }


2. Use two thresholds or backoff period to avoid backlog status switching processing back and forth

The scenario of backlog status switching back and forth happens when the throughput of the job when isBacklog = false cannot keep up with the data source. And the throughput of the job when isBacklog = true, which is optimized for throughput, can keep up with the data source. 

If the throughput when isBacklog = false cannot keep up with the data source and the job never switches to isBacklog = true, the watermark lag will keep increasing. Introducing the two thresholds method or backoff period can only mitigate the problem. And the real problem is that the throughput of the real-time processing cannot keep up with the data source.





  • No labels