You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA:

Released:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This proposal addresses several shortcomings of current (Flink 1.9) Flink TaskExecutor memory configuration.

(1) Different configuration for Streaming and Batch.

Currently, TaskExecutor memory is configured differently for streaming and batch jobs.

  • Streaming
    • Memory is implicitly consumed, either on-heap by memory state backend, or off-heap by RocksDB.
    • Users have to manually align heap size and choice of state backend.
    • Users have to manually configure RocksDB to use enough memory for good performance, but not too much to exceed the budget.
    • No predictability in the memory consumption, neither on-heap by memory state backend, nor off-heap by RocksDB.
  • Batch
    • Users configure total memory size, and whether to use on-heap or off-heap memory in operators.
    • Flink reserves a fraction of the total memory as managed memory. It adjusts the heap size and “max direct memory” parameters automatically to account for managed memory on-heap or off-heap.
    • Flink allocates memory segments for the managed memory, to be used by operators. It guarantees that the reserved memory segments are never exceeded.

(2) Complex and difficult configuration of RocksDB in Streaming

  • Users have to manually decrease the JVM heap size, or setting Flink to use off-heap memory. 
  • Users have to manually configure the RocksDB memory.
  • No way for users to use available memory as much as possible, because RocksDB memory size has to be configured conservatively low enough to make sure the memory budget is not exceeded.

(3) Complicated, uncertain and hard to understand

  • There are some “magic” when determining sizes of containers, processes. Some of these are not easy to reason about, for example “cutoff ratio” on Yarn and with other containers.
  • Configuring an off-heap state backend like RocksDB means either also setting managed memory to off-heap or adjusting the cutoff ratio, to dedicate less memory to the JVM heap.
  • TaskExecutor relies on instantaneous JVM memory usage for determining sizes of different memory pools, by first triggering a GC and then obtaining JVM free memory size, which introduces uncertainty to sizes of different memory pools.

Public Interfaces

Task executor memory configuration options. (See Memory Pools and Configuration Keys)

Proposed Changes

Unifying Managed Memory for Batch and Streaming

The basic idea is to consider memory used by state backends as part of managed memory, and extend memory manager so that state backends can simply reserve certain amount of memory from it but not necessarily allocate the memory from it. In this way, users should be able to switch between streaming and batch jobs, without having to modify the cluster configurations.

Memory Use Cases

  • Streaming jobs with Memory/FsStateBackend
    • JVM on-heap memory
    • Implicitly allocated by the state backend
    • FTM, no control on the overall memory consumption
  • Streaming jobs with RocksDBStateBackend
    • Off-heap memory
    • Implicitly allocated by the state backend
    • Cannot exceed total memory size, which is configured during initialization
  • Batch jobs
    • Off-heap memory
    • Explicitly allocated from the memory manager
    • Cannot exceed total memory allocated from memory manager

Unifying Explicit and Implicit Memory Allocation

  • Memory consumers can acquire memory in two ways
    • Explicitly acquire from MemoryManager, in the form of MemorySegment.
    • Reserve from MemoryManager, in which case should return “use up to X bytes”, and implicitly allocate the memory by the consumer itself.
  • MemoryManager never pre-allocate any memory pages, so that we keep the managed memory budget available for both allocation from MemoryManager and allocation directly from memory consumers.
  • For off-heap memory explicitly acquired from MemoryManager, Flink always allocate with Unsafe.allocateMemory(), which is not limited by the JVM -XX:MaxDirectMemorySize parameter.
    • This eliminates the uncertainty about how many off-heap managed memory should be accounted for JVM max direct memory. 
    • The fallback is that Unsafe is no longer supported in Java 12.

??? MemorySegment ???

It’s an open question that how memory buffers should be returned from MemoryManager in the cases of explicit allocations. 

  • Currently (Flink 1.9), memory buffers are returned as a list of MemorySegments, each wraps a memory buffer with the same configured page size.
  • An alternative could be to return one continuous buffer of the requested size.

In the current way, MemoryManager can flexibly assign pre-allocated MemorySegments to satisfy requirements with different memory amount, without having to release and re-allocate memory buffers. Since MemoryManager no longer supports pre-allocation, this is not a strong advantage. The fallback is that, the segment division among allocated memory may not be a good fit to how the consumers want to use the memory.

Separate On-Heap and Off-Heap Memory Pools for Managed Memory

Currently (Flink 1.9), all managed memory are allocated with the same type, either on-heap or off-heap. This is good with the current use cases, where we do not necessary need both on-heap and off-heap managed memory in the same task executor.

With the design in this proposal, memory usage of state backends is also considered as managed memory, which means we may have scenarios where jobs in the same cluster need different types of managed memory. E.g., a streaming job with MemoryStateBackend and another streaming job with RocksDBStateBackend.

Therefore, we separate the managed memory pool into the on-heap pool and the off-heap pool. We use an off-heap fraction to decide what fraction of managed memory should go into the off-heap pool, and leave the rest to the on-heap pool. Users can still configure the cluster to use all on-heap / off-heap managed memory by setting the off-heap fraction to 0 / 1.

Memory Pools and Configuration Keys

Framework Heap Memory

On-heap memory for the Flink task manager framework. It is not accounted for slot resource profiles.

(taskmanager.memory.framework.heap)

(default 128mb)

Task Heap Memory

On-heap memory for user code.

(taskmanager.memory.task.heap)

Task Off-Heap Memory

Off-heap memory for user code.

(taskmanager.memory.task.offheap

(default 0b)

Network Memory

Off-heap memory for network buffers.

(taskmanager.memory.network.[min/max/fraction]) or (taskmanager.network.memory.[min/max/fraction])

(default min=64mb, max=1gb, fraction=0.1)

Managed Memory

On-heap and off-heap Flink managed memory.

(taskmanager.memory.managed.[size|fraction])

(taskmanager.memory.managed.offheap-fraction)

(default fraction=0.5, offheap-fraction=0.0)

On-Heap Managed Memory = Managed Memory * (1 - offheap-fraction)

Off-Heap Managed Memory = Managed Memory * offheap-fraction

JVM Metaspace

Off-heap memory for JVM metaspace.

(taskmanager.memory.jvm-metaspace)

(default 192mb)

JVM Overhead

Off-heap memory for thread stack space, I/O direct memory, compile cache, etc.

(taskmanager.memory.jvm-overhead.[min/max/fraction])

(default min=128mb, max=1gb, fraction=0.1)

Total Flink Memory

Coarser config option for total flink memory, to make it easily configurable for users.

This includes Framework Heap Memory, Task Heap Memory, Task Off-Heap Memory, Network Memory, and Managed Memory.

This excludes JVM Metaspace and JVM Overhead.

(taskmanager.memory.size)

Total Process Memory

Coarser config option for everything, to make it easily configurable for users.

This includes Total Flink Memory, and JVM Metaspace and JVM Overhead.

(taskmanager.memory.process.size)

JVM Parameters

  • JVM heap memory
    • Includes Framework Heap Memory, Task Heap Memory, and On-Heap Managed Memory
    • Explicitly set both  -Xmx and -Xms to this value
  • JVM metaspace
    • Set -XX:MaxMetaspaceSize to configured JVM Metaspace


??? JVM direct memory ???

  • It’s an open question whether and how we set JVM -XX:MaxDirectMemorySize parameter
  • Off-Heap Managed Memory are allocated through Unsafe.allocateMemory(), and we can do the same thing for Network Memory. Then the max direct memory size parameter should only affect Task Off-Heap Memory and JVM Overhead.
  • Netty uses direct memory. Although in most cases it’s only tens of megabytes per task executor, it is possible that in some corner cases this could grow up to hundreds of megabytes.


Alternative 1: 

Do not set max direct memory size. Leave it to JVM default, which is the same as max heap size. Normally this should be enough for JVM Overhead, and Task Off-Heap Memory if there is not too many. The fallback is that, in cases where the user codes use significant direct memory, users need to manually set large max direct memory through  env.java.opts.

Alternative 2: 

Set max direct memory size strictly to the sum of configured Task Off-Heap Memory and JVM Overhead, so the users never need to manually configure it. It also guarantees that direct memory usage can never exceed the limit, and we get descriptive exceptions when it tries to. The fallback is that both Task Off-Heap Memory and JVM Overhead are usually empirically configured and may not be accurate. Thus it is likely to result in either instability due to direct OOM or low memory utility due to over reservation of Task Off-Heap Memory and JVM Overhead.

Alternative 3: 

Set max direct memory size to a very large value, which is unlikely to be reached. This also never requires users to manually set the JVM parameter. It will not have the stability issue due to over allocating direct memory, neither low memory utility due to over reservation. The fallback is that if Task Off-Heap Memory or JVM Overhead unexpectedly use large amount of direct memory, which may lead to memory overuse of a containerized task executor and get killed by the external resource management system, we cannot get a descriptive exception to raise the problem.

Memory Calculations

  • All the memory / pool size calculations take place before the task executor JVM is started. Once JVM is started, there should be no further calculations and deriving inside Flink TaskExecutor. 
  • The calculations should be performed in two places only.
    • In the startup shell scripts, for standalone.
    • On the resource manager side, for Yarn/Mesos/K8s.
  • The startup scripts can actually call java with the Flink runtime code to execute the calculation logics. In this way, we can make sure that standalone cluster and other active mode clusters have consistent memory calculation logics.
  • The calculated memory / pool sizes are passed into the task executor as environment variables.

Calculation Logics

We need either of these three options configured.

  • Task Heap Memory and Managed Memory
  • Total Flink Memory
  • Total Process Memory

The following logic describes how to derive these values from another.

  • If both Task Heap Memory and Managed Memory are configured, we use these to derive Total Flink Memory
    • If Network Memory is configured explicitly, we use that value
    • Otherwise, we compute it such that it makes up the configured fraction of the final Total Flink Memory (see getAbsoluteOrInverseFraction())
  • If Total Flink Memory is configured, but not Task Heap Memory and Managed Memory, then we derive Network Memory and Managed Memory, and leave the rest (excluding Framework Heap Memory and Task Off-Heap Memory) as Task Heap Memory.
    • If Network Memory is configured explicitly, we use that value
    • Otherwise we compute it such that it makes up the configured fraction of the Total Flink Memory (see getAbsoluteOrFraction())
    • If Managed Memory is configured explicitly, we use that value
    • Otherwise we compute it such that it makes up the configured fraction of the Total Flink Memory (see getAbsoluteOrFraction())
  • If only the Total Process Memory is configured, we derive the Total Flink Memory in the following way
    • We get (or compute relative) and subtract the JVM Overhead from Total Process Memory (see getAbsoluteOrFraction())
    • We subtract JVM Metaspace from the remaining
    • We leave the rest as Total Flink Memory

def getAbsoluteOrFraction(key: ConfigOption, base: Long): Long = {

    conf.getOrElse(key) {

        val (min, max, fraction) = getRange(conf, key)

        val relative = fraction * base

        Math.max(min, Math.min(relative, max))

    }

}

def getAbsoluteOrInverseFraction(key: ConfigOption, base: Long): Long = {

    conf.getOrElse(key) {

        val (min, max, fraction) = getRange(conf, key)

        val relative = fraction / (1 - fraction) * base

        Math.max(min, Math.min(relative, max))

    }

}

Compatibility, Deprecation, and Migration Plan

  • This FLIP changes how users configure cluster resources, which in some cases may require re-configuring of cluster if migrated from prior versions

Test Plan

  • We need to update existing and add new integration tests dedicated to validate the new memory configuration behaviors.
  • It is also expected that other regular integration and end-to-end tests should fail if this is broken.

Rejected Alternatives

Alternatives regarding MemorySegment and max direct memory are still under open discussion.

  • No labels