Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/29nvjt9sgnzvs90browb8r6ng31dcs3n
Vote threadhttps://lists.apache.org/thread/7cj6pzx7w0ynqyogxgk668wjr322mcqw
JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-

a9e9-3523-82ec-879b028fb15b
keyFLINK-33202

JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Extra notes: For jobs with multiple sources and execution.checkpointing.interval-during-backlog = 0, checkpoint triggering is enabled if and only if all sources have isBacklog=false. (More details for the checkpointing behavior can be found in the doc of execution.checkpointing.interval-during-backlog).

As a result, suppose a source has isBacklog=true, and another source switches from isBacklog=true to isBacklog=false, the job's checkpoint would still be disabled

...

  • Managed memory is allocated to operators with keyed inputs.

  • A general purpose keyed state backend that does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

  • Before source operator emits isBacklog=false (during backlog processing), the keyed input to one input operator is sorted by the key.

    • Multiple input operators Operators with multiple inputs can buffer and sort the inputs internally. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
      inputs.
    • Flink runtime will buffer/aggregate state's key/value in memory before persisting state's key/value to the underlying state backend (e.g. rocksdb), such that for the input records that are already sorted on the given keyWith the sorted input and the state backend optimization introduced in FLIP-325, there will be at most one read/write access to the Rocksdb underlying state backend for each key. And we only need to write to the Rocksdb state backend if the lru cache is full.
  • At the point when isBacklog switches to false:
    • Source operator emits RecordAttributes(isBacklog=false).
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

...

1) Add RecordAttributesBuilder and RecordAttributes that extends RuntimeEvent to StreamElement to provide operator with essential information about the records they receive, such as whether the records are already stale due to backlog.

...

Code Block
languagejava
/**
 * A RecordAttributes element provides stream task with information that can be used to optimize
 * the stream task's performance.
 */
@Experimental
public class RecordAttributes extends RuntimeEventStreamElement {
    /**
     * If it returns true, then the records received after this element are stale
     * and an operator can optionally buffer records until isBacklog=false. This
     * allows an operator to optimize throughput at the cost of processing latency.
     */
     @Nullable
     public Boolean isBacklog() {...}
}

...

  • If a OneInputTransformation has isInternalSorterSupported == false, keyed input, and execution.checkpointing.interval-during-backlog == 0:
    • Flink runtime will add an external sorter for the input to sort the keyed input during backlog processing
    • Its managed memory should be set according to execution.sorted-inputs.memoryUse
    • a state backend with cache introduced in FLIP-325 with a cache size of 1 if the cache size isn't explicitly specified by users so that we optimize the state backend access while minimizing the memory overhead. Alternatively, when users have set a cache size, we will adhere to that settingFlink runtime will buffer/aggregate state's key/value in memory before persisting state's key/value to the underlying state backend (e.g. rocksdb), such that for the input records that are already sorted on the given key, there will be at most one read/write access to the underlying state backend for each key.

With the above change, we can significantly enhance the performance of keyed one input operators without code change when "isBacklog=true". Moreover, the performance of the operator during isBacklog=true is close to the performance in batch mode.

...