Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Memory consumers can acquire memory in two ways
    • Explicitly acquire from MemoryManager, in the form of MemorySegment.
    • Reserve from MemoryManager, in which case should return “use up to X bytes”, and implicitly allocate the memory by the consumer itself.
  • MemoryManager never pre-allocate any memory pages, so that we keep the managed memory budget available for both allocation from MemoryManager and allocation directly from memory consumers.
  • For off-heap memory explicitly acquired from MemoryManager, Flink always allocate with Unsafe.allocateMemory(), which is not limited by the JVM -XX:MaxDirectMemorySize parameter.
    • This eliminates the uncertainty about how many off-heap managed memory should be accounted for JVM max direct memory. 
    • The fallback is that Unsafe is no longer supported in Java 12.

??? MemorySegment ???

It’s an open question that how memory buffers should be returned from MemoryManager in the cases of explicit allocations. 

  • Currently (Flink 1.9), memory buffers are returned as a list of MemorySegments, each wraps a memory buffer with the same configured page size.
  • An alternative could be to return one continuous buffer of the requested size.

In the current way, MemoryManager can flexibly assign pre-allocated MemorySegments to satisfy requirements with different memory amount, without having to release and re-allocate memory buffers. Since MemoryManager no longer supports pre-allocation, this is not a strong advantage. The fallback is that, the segment division among allocated memory may not be a good fit to how the consumers want to use the memory.

Separate On-Heap and Off-Heap Memory Pools for Managed Memory

...

  • JVM heap memory
    • Includes Framework Heap Memory, Task Heap Memory, and On-Heap Managed Memory
    • Explicitly set both  -Xmx and -Xms to this value
  • JVM metaspace
    • Set -XX:MaxMetaspaceSize to configured JVM Metaspace

??? JVM direct memory ???

  • direct memory
    • Includes Task Off-heap Memory and JVM Overhead
    • Explicitly set
    It’s an open question whether and how we set JVM
    • -XX:MaxDirectMemorySize
    parameter
    • to this value
    • For Off-
    Heap
    • heap Managed Memory
    are allocated through Unsafe.allocateMemory(),
    • and
    we can do the same thing for
    • Network
    Memory. Then the max direct memory size parameter should only affect Task Off-Heap Memory and JVM Overhead.Netty uses direct
    • memory
    . Although in most cases it’s only tens of megabytes per task executor, it is possible that in some corner cases this could grow up to hundreds of megabytes.

Alternative 1: 

Do not set max direct memory size. Leave it to JVM default, which is the same as max heap size. Normally this should be enough for JVM Overhead, and Task Off-Heap Memory if there is not too many. The fallback is that, in cases where the user codes use significant direct memory, users need to manually set large max direct memory through  env.java.opts.

Alternative 2: 

Set max direct memory size strictly to the sum of configured Task Off-Heap Memory and JVM Overhead, so the users never need to manually configure it. It also guarantees that direct memory usage can never exceed the limit, and we get descriptive exceptions when it tries to. The fallback is that both Task Off-Heap Memory and JVM Overhead are usually empirically configured and may not be accurate. Thus it is likely to result in either instability due to direct OOM or low memory utility due to over reservation of Task Off-Heap Memory and JVM Overhead.

Alternative 3: 

Set max direct memory size to a very large value, which is unlikely to be reached. This also never requires users to manually set the JVM parameter. It will not have the stability issue due to over allocating direct memory, neither low memory utility due to over reservation. The fallback is that if Task Off-Heap Memory or JVM Overhead unexpectedly use large amount of direct memory, which may lead to memory overuse of a containerized task executor and get killed by the external resource management system, we cannot get a descriptive exception to raise the problem.

    • , we always allocate memory with Unsafe.allocateMemory(), which will not be limited by this parameter.
  • JVM metaspace
    • Set -XX:MaxMetaspaceSize to configured JVM Metaspace

Memory Calculations

  • All the memory / pool size calculations take place before the task executor JVM is started. Once JVM is started, there should be no further calculations and deriving inside Flink TaskExecutor. 
  • The calculations should be performed in two places only.
    • In the startup shell scripts, for standalone.
    • On the resource manager side, for Yarn/Mesos/K8s.
  • The startup scripts can actually call java with the Flink runtime code to execute the calculation logics. In this way, we can make sure that standalone cluster and other active mode clusters have consistent memory calculation logics.
  • The calculated memory / pool sizes are passed into the task executor as environment variables.

...

  • We need to update existing and add new integration tests dedicated to validate the new memory configuration behaviors.
  • It is also expected that other regular integration and end-to-end tests should fail if this is broken.

Rejected Alternatives

Regarding JVM direct memory, we have the following alternative.

  1. Have MemorySegments de-allocated by the GC, and trigger GC by setting proper JVM max direct memory size parameter.
  2. Have MemorySegments de-allocated by the GC, and trigger GC by a dedicated bookkeeping independent from JVM max direct memory size parameter.

  3. Manually allocate and de-allocate MemorySegments. 

We decided to go with 3, but depends on how segment fault safe it turns out to be, we may easily switch to other alternatives after the implementationAlternatives regarding MemorySegment and max direct memory are still under open discussion.