Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In certain use-cases, a user may start a Flink job to read from a Kafka topic, beginning from an offset corresponding to data from 5 days ago. And user requires that records from 5 days ago are processed with high throughput to quickly backfill the result for the last 5 days and continue processing the real-time data in a timely fashion.
FLIP-309 introduces the concept of processing backlog. When a record emitted by a source contains the attribute isBackLog=true, downstream operators can optimize for throughput over latency.
In this FLIP, we propose to enable source operators to report whether they are processing backlog based on the watermark lag.
Public Interfaces
1) Add the job-level config pipline.backlog.watermark-lag-threshold
Here is the specification of this config. This config will be marked Experimental.
- Name: pipline.backlog.watermark-lag-threshold
- Type: Duration
- Default value: null
- Description: The threshold of watermark lag for backlog detection. A source is considered to be processing backlog data if its watermark lags behind the current time by a value greater than the threshold. If it is null, source will not report its backlog status based on watermark lag. If it is not null, it must be greater than 0. Note that if there are multiple backlog statuses from different backlog strategies, they are combined using a logical OR operation. It is important to set the threshold according to the WatermarkStrategy.
Proposed Changes
- Update the SourceOperator to emit RecordAttribute
- When the SourceOperator emits a watermark to downstream operators, it checks the watermark lag and emitsRecordAttribute(backlog=true) event to the downstream operator. The watermark lag is calculated the same way as defined in FLIP-33, WatermarkLag = CurrentTime - Watermark. When the current watermark lags behind the current time by a value greater than the threshold. A RecordAttribute(backlog=false) is sent when the watermark lag is within the threshold.
- When the watermark status of the SourceOperator changes to idle, it emits the RecordAttribute(backlog=false) event to downstream operators. When the watermark status changes from idle to active, it will check the current watermark and emit RecordAttributeevent according to the watermark threshold.
Compatibility, Deprecation, and Migration Plan
The changes made in this FLIP are backward compatible. With the default value of pipline.backlog.watermark-lag-threshold, the source will not report its backlog status based on watermark lag, which is the current behavior.
Rejected Alternatives
1. Specify the watermark lag threshold for each Source
- Add
getProcessingBacklogEventTimeLagThreshold
method to theSourceReader
interface.
@Public public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable, CheckpointListener { /** * Returns the threshold in milliseconds for backlog detection. The source is considered to * be processing backlog if its watermark lags behind the current time by a value greater than * the threshold. * * <p>The method returns {@code Long.MAX_VALUE} by default, which indicates that the source is never considered to be * processing backlog. * * @return The threshold in milliseconds of watermark lag for backlog detection. */ default long getProcessingBacklogWatermarkLagThresholdMs() { return Long.MAX_VALUE; } }
- Update KafkaSource API to allow users to specify the threshold.
@PublicEvolving public class KafkaSourceBuilder<OUT> { ... KafkaSourceBuilder() { ... this.processingBacklogWatermarkLagThresholdMs = Long.MAX_VALUE; } /** * Sets threshold in milliseconds for backlog detection. The Kafka source is considered to * be processing backlog if its watermark lags behind the current time by a value greater than the threshold. */ public KafkaSourceBuilder<OUT> setProcessingBacklogWatermarkLagThresholdMs(long processingBacklogWatermarkLagThresholdMs) { ... } }
2. Use two thresholds or backoff period to avoid backlog status switching processing back and forth
The scenario of backlog status switching back and forth happens when the throughput of the job when isBacklog = false cannot keep up with the data source. And the throughput of the job when isBacklog = true, which is optimized for throughput, can keep up with the data source.
If the throughput when isBacklog = false cannot keep up with the data source and the job never switches to isBacklog = true, the watermark lag will keep increasing. Introducing the two thresholds method or backoff period can only mitigate the problem. And the real problem is that the throughput of the real-time processing cannot keep up with the data source.