Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • JVM heap memory
    • Includes Framework Heap Memory, Task Heap Memory, and On-Heap Managed Memory
    • Explicitly set both  -Xmx and -Xms to this value
  • JVM direct memory
    • Includes Task Off-heap Memory, Network Memory and JVM Overhead
    • Explicitly set -XX:MaxDirectMemorySize to this value
    • For Off-heap Managed Memory and Network memory, we always allocate memory with Unsafe.allocateMemory(), which will not be limited by this parameter.
  • JVM metaspace
    • Set -XX:MaxMetaspaceSize to configured JVM Metaspace

...

  • All the memory / pool size calculations take place before the task executor JVM is started. Once JVM is started, there should be no further calculations and deriving inside Flink TaskExecutor. 
  • The calculations should be performed in two places only.
    • In the startup shell scripts, for standalone.
    • On the resource manager side, for Yarn/Mesos/K8s.
  • The startup scripts can actually call java with the Flink runtime code to execute the calculation logics. In this way, we can make sure that standalone cluster and other active mode clusters have consistent memory calculation logics.
  • The calculated memory / pool sizes are passed into the task executor as environment variablesdynamic configurations (via '-D').

Calculation Logics

We need either of these three options configured.

...

  • Introduce new configuration options
  • Introduce TaskExecutorSpecifics
    • Data structure to store memory / pool sizes of task executor
    • Utility for calculating memory / pool sizes from configuration
    • Utility for converting between TaskExecutorSpecifics and environment variablesgenerating dynamic configurations
    • Utility for generating JVM parameters

...

  • Invoke TaskExecutorSpecifics to generate JVM parameters and environment variables and dynamic configurations for launching new task executors.
    • In startup scripts
    • In resource managers
  • Task executor uses TaskExecutorSpecifics (converted from environment variables) to  to set memory pool sizes and slot resource profiles.
    • MemoryManager
    • ShuffleEnvironment
    • TaskSlotTable

...

Implement this step in common code paths for the legacy / new mode. For the legacy mode, depending to the configured memory type, we can set one of the two pools to the managed memory size and always allocate from this pool, leaving the other pool empty.

Step 5.

...

  • Disable pre-allocation.
  • Extend MemoryManager to support memory reserving.

Implement this step in common code paths for the legacy / new mode. This introduces behavior changes to legacy mode by disable pre-allocation, which only affects performance without breaking any functionality.

...

Use native memory for

...

managed memory.

  • Allocate memory with Unsafe.allocateMemory
    • MemoryManagerShuffleEnvironment

Implement this issue in common code paths for the legacy / new mode. This should only affect the GC behavior.

Step

...

6. Clean-up of legacy mode.

  • Fix / update / remove test cases for legacy mode
  • Deprecate / remove legacy config options.
  • Remove legacy code paths
  • Remove the switch for legacy / new mode.

...