Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA
Discussion threadhttps://lists.apache.org/thread/29nvjt9sgnzvs90browb8r6ng31dcs3n
Vote threadhttps://lists.apache.org/thread/7cj6pzx7w0ynqyogxgk668wjr322mcqw
JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

keyFLINK-33202

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • Managed memory is allocated to operators with keyed inputs.

  • A general purpose keyed state backend that does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is instantiated by Flink runtime.

  • Before source operator emits isBacklog=false (during backlog processing), the keyed input to one input operator is sorted by the key.

    • Multiple input operators Operators with multiple inputs can buffer and sort the inputs internally. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
      ) to sort inputs.
    • Flink runtime will buffer/aggregate state's key/value in memory before persisting state's key/value to the underlying state backend (e.g. rocksdb), such that for the input records that are already sorted on the given keyWith the sorted input and the state backend optimization introduced in FLIP-325, there will be at most one read/write access to the Rocksdb underlying state backend for each key. And we only need to write to the Rocksdb state backend if the lru cache is full.
  • At the point when isBacklog switches to false:
    • Source operator emits RecordAttributes(isBacklog=false).
  • The operator continues to process records using the keyed state backend instantiated by Flink runtime, which now contains the full state obtained from records received while isBacklog=true.

...

  • If a OneInputTransformation has isInternalSorterSupported == false, keyed input, and execution.checkpointing.interval-during-backlog == 0:
    • Flink runtime will add an external sorter for the input to sort the keyed input during backlog processing
    • Its managed memory should be set according to execution.sorted-inputs.memoryUse
    • a state backend with cache introduced in FLIP-325 with a cache size of 1 if the cache size isn't explicitly specified by users so that we optimize the state backend access while minimizing the memory overhead. Alternatively, when users have set a cache size, we will adhere to that settingFlink runtime will buffer/aggregate state's key/value in memory before persisting state's key/value to the underlying state backend (e.g. rocksdb), such that for the input records that are already sorted on the given key, there will be at most one read/write access to the underlying state backend for each key.

With the above change, we can significantly enhance the performance of keyed one input operators without code change when "isBacklog=true". Moreover, the performance of the operator during isBacklog=true is close to the performance in batch mode.

...