Discussion threadhttps://lists.apache.org/thread/m4n8q434f284dsbfy6q0cc1dqklnoy3o
Vote threadhttps://lists.apache.org/thread/hmn2fstyb2gvolmvn65tm8st9jokt08t
JIRA

Unable to render Jira issues macro, execution error.

Release1.11

Motivation

This FLIP suggests aligning the memory model and configuration for Job Manager (JM) with the recently introduced memory model of Task Manager (TM) in FLIP-49 .

The memory model of JM does not need to be as extensive as the TM one. A lot of motivation points in FLIP-49 are not applicable here. Nonetheless, apart of aligning two memory models, there are couple of explicit issues with the current memory settings of JM:

  • `jobmanager.heap.size` option is deluding for the containerised environments (Yarn/Mesos/Kubernetes) because it does not represent the JM’s JVM heap size but the total process memory consumed by the Flink JVM process including container cut-off . It is used to set the JM container memory size requested in a containerised environment .
  • The purpose of the container cut-off can be also confusing on its own, the main use cases for it are:
    • Off-heap memory usage by Flink or user code dependencies (there are certain cases where user code is run during the job start up)
    • JVM Metaspace
    • Other JVM overhead
  • There is no way to reasonably limit JVM  Direct Memory allocation, so it is not controlled by JVM. Therefore it can be hard to debug off-heap memory leaks and container killing because of OOM.
  • Same for the JVM Metaspace size to expose possible class loading leaks.

Public Interfaces

JM memory configuration options. See proposed changes and deprecation.

Proposed Changes

There are no special memory use cases or management in JM, like in TM. This boils down to the following main sources of memory consumption, depicted in the following picture:

Further, the presented memory components are described in detail.

Total Process Memory

The Total Process Memory is the full memory consumption of the running JM JVM system process. Same as for TM, it consists of the Total Flink Memory and JVM specific memory ( Metaspace and other Overhead ). This is also the size of the JM container requested in a containerised environment (Yarn/Mesos/Kubernetes). In the standalone environment , this size does not really affect Flink behaviour, except deriving JVM Heap size.

Same as for TM, configuring the size of this memory can be the easiest way to setup JM memory. Then the size of JVM Overhead and Total Flink Memory can be derived from it. See also ' Configuration process' further .

Total Flink Memory

The Total Flink Memory is the memory consumed by framework and job user code. It does not include JVM specific memory ( Metaspace and other Overhead ).

Same as for TM, configuring the size of this memory can be another way to setup memory. It is an easy way for the standalone environment without thinking about JVM memory consumption. The JVM Heap , the Total Process Memory and JVM Overhead (mostly for the containerised environments ) can be always derived from it.  See also ' Configuration process' further .

JVM Heap

The JVM heap size is set as corresponding JVM arguments ( -Xms and  -Xmx ) when the JM process is started in standalone scripts or container startup script. There can be the following possible sources of the JVM heap consumption in JM:

  • Flink framework
  • User code running during job submission in certain cases or in checkpoint completion callbacks
  • Job Cache


The size of JVM heap is mostly driven by the amount of running jobs, their structure and requirements for the mentioned user code.

Setting JVM heap can be the third way to setup memory for JM. This can be a simple way when other memory components are not interesting at all. The sizes of other memory components can be then derived from it.  See also ' Configuration process' further .

The Job Cache is part of the JVM heap . It already can be configured currently by ` jobstore.cache-size `. Its size should be asserted to not exceed the size of the JVM heap .

Off-heap Memory

The Off-heap Memory  component accounts for any types of native memory usage including JVM Direct Memory. Therefore, it is also set as the corresponding JVM argument: -XX:MaxDirectMemorySize . There can be the following possible sources of the Off-heap Memory consumption in JM:

  • JM network communication (Akka)
  • Possibly JM external Flink framework dependencies (presumably, either none or negligible at the moment)
  • User code running during job submission in certain cases or in checkpoint completion callbacks

Note: Although, we say that the limit of the JVM D irect Memory is set to the size of the Off-heap Memory, the user docs should explain that this also covers non-direct memory allocation in e.g. native code. Here, we trade-off simplicity for potentially not exact JVM D irect Memory limit to avoid introducing more components.

Setting the limit for the JVM D irect Memory should improve container deployment experience and debugging the JVM  Direct Memory leaks.

The size of Off-heap Memory is mostly driven by the network communication (amount of running jobs and their structure) and requirements for the mentioned user code. Therefore, it may be hard to come up with its default value which covers all the use cases so it can be a point of tuning. Preliminary, it can be set to 128Mb.

JVM Metaspace

The JVM Metaspace is also set as the corresponding JVM argument: -XX:MaxMetaspaceSize . There can be the following possible sources of the JVM Metaspace consumption in JM:

  • Flink frameworks loaded classes
  • Classes loaded in user code running during job submission in certain cases or in checkpoint completion callbacks

Setting the limit for the JVM Metaspace should improve container deployment experience and debugging the JVM Metaspace leaks.

The size of JVM Metaspace is mostly driven by the requirements for the mentioned user code. Therefore, it may be hard to come up with its default value which covers all the use cases so it can be a point of tuning. Preliminary, it can be set to 256Mb.

JVM Overhead

All JVM and other off-heap memory consumption, which is not accounted for in other memory components, can be part of this JVM Overhead . This is mostly relevant for the containerised environments to derive the Total Process Memory and request the size of the JM container memory. The way of the JVM Overhead configuration can be the same as for TM. The size of the JVM Overhead can be a configured fraction of the Total Process Memory which is capped by its configured min and max values.  See also ' Configuration process' further .

Configuration process

Same as for TM, there are 3 ways to configure memory for JM. They are setting the explicit size of either of the following components:

  • Total Process Memory
  • Total Flink Memory
  • JVM Heap

The options to set the explicit sizes of the listed components do not have default values. It is required for users to always set either of them.

If one of the 3 options is set, all other components are derived from them according to the following equations:

Total Process Memory = Total Flink Memory + JVM Metaspace + JVM Overhead

Total Flink Memory = JVM Heap + Off-heap memory

JVM Overhead = Math.max(min, Math.min(max, fraction * Total Process Memory ))

See also JVM Overhead for more details about min, max and fraction.

The default flink-conf.yaml , shipped with the Flink distribution, can have the Total Process Memory set to e.g. 1472Mb. It should result in the JVM Heap being 1024Mb which is the value set currently in the default flink-conf.yaml for the `jobmanager.heap.size` option. The `jobmanager.heap.size` is currently the JVM Heap size in the standalone environment .

Introduced configuration options

Memory component

options

Default value

Total Process Memory

jobmanager.memory.process.size

None

(“1472m” in default flink-conf.yaml )

Total Flink Memory

jobmanager.memory.flink.size

None

JVM Heap

jobmanager.memory.heap.size

None

Off-heap memory

jobmanager.memory.off-heap.size

“128m”

JVM Metaspace

jobmanager.memory.jvm-metaspace.size

“256m”

JVM Overhead

jobmanager.memory.jvm-overhead.min

“192m”

jobmanager.memory.jvm-overhead.max

“1g”

jobmanager.memory.jvm-overhead.fraction

0.1

Implementation Steps

  1. Introduced new options
  2. Introduce data structures and utilities
    1. Data structure to store memory sizes of JM
    2. Utility for calculating memory sizes from configuration
  3. Extend the calculation utility and BashJavaUtils with generating JVM arguments to start JM process
  4. Call BashJavaUtils in the standalone startup scripts and use returned JVM arguments to start JM JVM process (ClusterEntryPoint) instead of current bash code
  5. Use new memory calculation utility to get the Total Process Memory size and the JVM arguments to start the JM container (ClusterEntryPoint) in the containerized environment

Compatibility, Deprecation, and Migration Plan

` jobmanager.heap.size ` and ` jobmanager.heap.mb ` will be deprecated but if set, will fallback to:

  • JVM Heap for standalone environment and to
  • Total Process Memory for the containerised environments .

The container cut-off configuration options (‘ containerized.heap-cutoff-ratio ’ and ‘ containerized.heap-cutoff-min ’) are not used for TM anymore after FLIP-49 but they are still currently used for the JM container. After implementing this FLIP, they will not be needed for the JM as well and can be removed.

Test Plan

Add/check sufficient coverage by unit and e2e tests.

Rejected Alternatives

None so far.