Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update the wiki page according to the final implementation.

...

JIRAhttps://jira.apache.org/jira/browse/FLINK-13980

Released: Flink 1.10

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The basic idea is to consider memory used by RocksDB state backends as part of managed memory, and extend memory manager so that state backends memory consumers can simply reserve certain amount of memory from it but not necessarily allocate the memory from it. In this way, users should be able to switch between streaming and batch jobs, without having to modify the cluster configurations.

Memory Use Cases

...

  • Streaming jobs with RocksDBStateBackend
    • Off-heap memory
    • Implicitly allocated by the state backend
    • Cannot exceed total memory size, which is configured during initialization
  • Batch jobs
    • OffEither on-heap or off-heap memory
    • Explicitly allocated from the memory manager
    • Cannot exceed total memory allocated from memory manager

To make managed memory work with both cases, we should always allocate managed memory off-heap.

Unifying Explicit and Implicit Memory Allocation

  • Memory consumers can acquire memory in two ways
    • Explicitly acquire from MemoryManager, in the form of MemorySegment.
    • Reserve from MemoryManager, in which case should return “use up to X bytes”, and implicitly allocate the memory by the consumer itself.
  • MemoryManager never pre-allocate any memory pages, so that we keep the managed memory budget available for both allocation from MemoryManager and allocation directly from memory consumers.
  • For off-heap memory explicitly acquired from MemoryManager, Flink always allocate with Unsafe.allocateMemory(), which is not limited by the JVM -XX:MaxDirectMemorySize parameter.
    • This eliminates the uncertainty about how many off-heap managed memory should be accounted for JVM max direct memory. 
    • The drawback is that Unsafe is no longer supported in Java 12.

Separate On-Heap and Off-Heap Memory Pools for Managed Memory

Currently (Flink 1.9), all managed memory are allocated with the same type, either on-heap or off-heap. This is good with the current use cases, where we do not necessary need both on-heap and off-heap managed memory in the same task executor.

With the design in this proposal, memory usage of state backends is also considered as managed memory, which means we may have scenarios where jobs in the same cluster need different types of managed memory. E.g., a streaming job with MemoryStateBackend and another streaming job with RocksDBStateBackend.

Therefore, we separate the managed memory pool into the on-heap pool and the off-heap pool. We use an off-heap fraction to decide what fraction of managed memory should go into the off-heap pool, and leave the rest to the on-heap pool. Users can still configure the cluster to use all on-heap / off-heap managed memory by setting the off-heap fraction to 0 / 1.

Memory Pools and Configuration Keys

Image Removed

Framework Heap Memory

Memory Pools and Configuration Keys

Image Added

Framework Heap Memory

On-heap memory for the Flink task manager framework. It is not accounted for slot resource profiles.

(taskmanager.memory.framework.heap.size)

(default 128mb)

Framework Off-Heap Memory

OffOn-heap memory for the Flink task manager framework. It is not accounted for slot resource profiles.

(taskmanager.memory.framework.off-heap.size)

(default 128mb)

Task Heap Memory

...

(taskmanager.memory.task.heap.size)

Task Off-Heap Memory

Off-heap memory for user code.

(taskmanager.memory.task.offheapoff-heap.size

(default 0b)

...

Network Memory

Off-heap memory for shufflingshuffle service, e.g., network buffers.

(taskmanager.memory.shufflenetwork.[min/max/fraction])

(default min=64mb, max=1gb, fraction=0.1)

Managed Memory

OnOff-heap and off-heap Flink managed memory.

(taskmanager.memory.managed.[size|fraction])(taskmanager.memory.managed.offheap-fraction)

(default fraction=0.5, offheap-fraction=0.0)

On-Heap Managed Memory = Managed Memory * (1 - offheap-fraction)

Off-Heap Managed Memory = Managed Memory * offheap-fraction

4)

JVM Metaspace

Off-heap memory for JVM metaspace.

(taskmanager.memory.jvm-metaspace)

(default 192mb96mb)

JVM Overhead

Off-heap memory for thread stack space, I/O direct memory, compile cache, etc.

(taskmanager.memory.jvm-overhead.[min/max/fraction])

(default min=128mb192mb, max=1gb, fraction=0.1)

Total Flink Memory

...

This includes Framework Heap Memory, Framework Off-Heap Memory, Task Heap Memory, Task Off-Heap Memory, Shuffle Network Memory, and Managed Memory.

This excludes JVM Metaspace and JVM Overhead.

(taskmanager.memory.total-flink.size)

Total Process Memory

...

This includes Total Flink Memory, and JVM Metaspace and JVM Overhead.

(taskmanager.memory.total-process.size)

JVM Parameters

  • JVM heap memory
    • Includes Framework Heap Memory, Task Heap Memory, and On-Heap Managed Memory
    • Explicitly set both  -Xmx and -Xms to this value
  • JVM direct memory
    • Includes Framework Off-Heap Memory, Task Off-heap Memory and Shuffle MemoryNetwork Memory
    • Explicitly set -XX:MaxDirectMemorySize to this value
    • For Off-heap Managed Memory, we always allocate memory with Unsafe.allocateMemory(), which will not be limited by this parameter.
  • JVM metaspace
    • Set -XX:MaxMetaspaceSize to configured JVM Metaspace

...

  • If both Task Heap Memory and Managed Memory are configured, we use these to derive Total Flink Memory
    • If ShuffleNetwork Memory is configured explicitly, we use that value
    • Otherwise, we compute it such that it makes up the configured fraction of the final Total Flink Memory (see getAbsoluteOrInverseFraction())
  • If Total Flink Memory is configured, but not Task Heap Memory and Managed Memory, then we derive Shuffle Memory Network Memory and Managed Memory, and leave the rest (excluding Framework Heap Memory and , Framework Off-Heap Memory and Task Off-Heap Memory) as Task Heap Memory.
    • If Shuffle  Network Memory is configured explicitly, we use that value
    • Otherwise we compute it such that it makes up the configured fraction of the Total Flink Memory (see getAbsoluteOrFraction())
    • If Managed Memory is configured explicitly, we use that value
    • Otherwise we compute it such that it makes up the configured fraction of the Total Flink Memory (see getAbsoluteOrFraction())
  • If only the Total Process Memory is configured, we derive the Total Flink Memory in the following way
    • We get (or compute relative) and subtract the JVM Overhead from Total Process Memory (see getAbsoluteOrFraction())
    • We subtract JVM Metaspace from the remaining
    • We leave the rest as Total Flink Memory

...

Deprecated configuration keys are as follows:

Deprecated KeyAs Fallback of New KeyNotes
taskmanager.heap.size

Standalone: taskmanager.memory.

total-

flink.size
Yarn/Mesos/K8s: taskmanager.memory.process.size


taskmanager.heap.mb

Standalone: taskmanager.memory.

total-

flink.size
Yarn/Mesos/K8s: taskmanager.memory.process.size


taskmanager.memory.sizetaskmamager.memory.managed.size
taskmanager.memory.fraction
taskmanager
N/A`taskmanager.memory.managed.
fraction
fraction` now has different sementices.
taskmanager.memory.off-heap
taskmanager.memory.managed.offheap-fraction
N/A`taskmanager.memory.
managed.offheap-fraction` is set to `1.0`, if it is not explicitly configured by the user and `taskmanager.memory.off-heap` is `true`
off-heap` will be ignored, because we no-longer support on-heap managed memory.
taskmanager.memory.preallocateN/A`taskmanager.memory.preallocate` will be ignored, because we no-longer support pre-allocation of managed memory.
taskmanager.network.memory.[min/max/fraction]taskmanager.memory.shuffle.[min/max/fraction]

Test Plan

  • We need to update existing and add new integration tests dedicated to validate the new memory configuration behaviors.
  • It is also expected that other regular integration and end-to-end tests should fail if this is broken.

...