You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 21 Next »

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When using Flink, users may encounter the following issues that affect usability. In order to improve usability, this FLIP mainly proposes optimization solutions for the two issues.

  1. The job may fail with an "Insufficient number of network buffers" exception.

When running a job, users may encounter an "Insufficient number of network buffers" exception with the default configurations. To solve this issue, multiple memory config options should be adjusted and the config option adjustment requires understanding the detailed internal implementation, which is impractical for most users.

One of the major problems of the exception is that the memory footprint is unpredictable because the memory footprint is linearly related to job parallelism. To reduce the probability of the exception, the core idea is to decouple memory footprint from job parallelism as much as possible.

Flink Shuffle network memory footprint consists of two parts, the Shuffle write memory and the Shuffle read memory. The Shuffle write memory is used for ResultPartitionWriter to write data, and the Shuffle read memory is used for InputGate to read data.

For Shuffle write memory, Sort Merge Shuffle mode has decoupled the write memory size from parallelism, and the memory size in Hybrid Shuffle and Pipeline Shuffle mode is still related to parallelism. Decoupling the write memory footprint from the job parallelism for Pipelined and Hybrid Shuffle requires changes to the core data structures of these shuffle implementations, which we'd like to exclude from the scope of this FLIP and address as future follow-ups. 

For Shuffle read memory,  the read buffer in InputGate is mainly divided into two parts, Floating Buffers, and Exclusive Buffers. 

TotalBuffersPerGate = ExclusiveBuffersPerChannel * numChannels + FloatingBuffersPerGate.

The numChannels is the number of InputChannels. ExclusiveBuffersPerChannel is determined by taskmanager.network.memory.buffers-per-channel, which is 2 by default. FloatingBuffersPerGate ranges from 1 to DefaultFloatingBuffersPerGate. DefaultFloatingBuffersPerGate is determined by taskmanager.network.memory.floating-buffers-per-gate, which is 8 by default. Therefore, the range of TotalBuffersPerGate for each InputGate is [ExclusiveBuffersPerChannel * numChannels + 1, ExclusiveBuffersPerChannel * numChannels + DefaultFloatingBuffersPerGate], i.e., the total number of read buffers in an InputGate is bound to and positively related to the number of parallelisms. When the parallelism is high, an "Insufficient number of network buffers" exception may be thrown. In general, batch jobs may run with higher parallelism, making this exception easier to be thrown.

In order to solve the above issue during Shuffle read, our target is that even if the user does not adjust the memory config options, in most scenarios, the probability of the "Insufficient number of network buffers" exception can be significantly reduced, and the job performance will not be affected.

      2. Flink network memory size adjustment is complex

Currently, several config options may influence Flink network memory size, including the total memory size (taskmanager.memory.flink.size), the network memory fraction (taskmanager.memory.network.fraction), the minimum network memory size (taskmanager.memory.network.min) and the maximum network memory size (taskmanager.memory.network.max). When adjusting these configuration options, the options may affect each other, which is confusing to users. The following is a simple example to illustrate this issue.

For example, assume that the total memory of taskmanager is 8g, and taskmanager.memory.network.fraction is 0.1 by default, that is, Flink network memory is 0.8g. If a user expects to increase the network memory to 1.6g, the user increases the network memory fraction to 0.2. However, since taskmanager.memory.network.max is 1g by default, the network memory size is actually only 1g, which is inconsistent with the user's expectation. The user will increase  taskmanager.memory.network.max to at least 1.6g, and the network memory will indeed increase to 1.6g as expected.

taskmanager.memory.network.max is designed to prevent network memory from increasing unlimitedly when the total task manager memory increases. However, the 1g default value is not always desired and complicates the network memory tuning. 

This memory maximum limit can be disabled by default so that the network memory size increases with the fraction, making it easier for users to control the network memory size. 


Public Interfaces

  • Add a new config option.
    • Key
      • taskmanager.memory.network.read-buffer.required-per-gate.max
    • Default
      • (none).
    • Type
      • Integer.
    • Description
      • The maximum number of network read buffers that are required by an input gate. (An input gate is responsible for reading data from all subtasks of an upstream task.) The number of buffers needed by an input gate is dynamically calculated in runtime, depending on various factors (e.g., the parallelism of the upstream task). Among the calculated number of needed buffers, the part below this configured value is required, while the excess part, if any, is optional. A task will fail if the required buffers cannot be obtained in runtime. A task will not fail due to not obtaining optional buffers, but may suffer a performance reduction. If not explicitly configured, the default value is Integer.MAX_VALUE for streaming workloads, and 1000 for batch workloads.
    • In the first version, this new config will be annotated as @Experimental. We will revisit and try to remove the annotation in the next version.
  • Deprecated 2 existing config options
    • taskmanager.network.memory.buffers-per-channel
    • taskmanager.network.memory.floating-buffers-per-gate
    • We will annotate these 2 configs as @Deprecated until the @Experimental annotation for taskmanager.memory.network.read-buffer.required-per-gate.max is removed.
  • Modify the default value of taskmanager.memory.network.max from 1g to MemorySize.MAX_VALUE.

Proposed Changes

Flink Shuffle reading buffers for InputGate

Shuffle read memory shortage mainly occurs when running a job with high parallelism. According to the current runtime state, the number of required buffers for each InputGate is at least ExclusiveBuffersPerChannel * numChannels + 1. In order to not affect the performance but also reduce the possibility of insufficient network memory exceptions, a threshold of the buffer number, numFloatingBufferThreshold, is introduced. 

  • When ExclusiveBuffersPerChannel * numChannels <= numFloatingBufferThreashold

    • All read buffers in InputGate contain Floating Buffers and Exclusive Buffers.

    • ExclusiveBuffersPerChannel is 2 by default, which is consistent with the current state.

    • FloatingBuffersPerGate is within the range of [1, DefaultFloatingBuffersPerGate] by default, consistent with the current state.

  • When ExclusiveBuffersPerChannel * numChannels > numFloatingBufferThreashold,

    • When the total exclusive buffers to use reach the threshold, the number of exclusive buffers per channel is gradually reduced in a fine-grained manner, rather than directly reducing the number of exclusive buffers to 0, the process is as follows.
    • Set the number of exclusive buffers according to the method of gradually decreasing from ExclusiveBuffersPerChannel to 1 until ExclusiveBuffersPerChannel * numChannels<=numFloatingBufferThread, and the remaining buffers are floating. If the number of exclusive buffers is 1 and the total number of exclusive buffers is still greater than numFloatingBufferThread, set the number of exclusive buffers per channel as 0 to use all-buffer-floating.

    • When using all-buffer-floating, FloatingBuffersPerGate is within the range of [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels + DefaultFloatingBuffersPerGate], which can ensure that the required size of Shuffle read memory is decoupled from the parallelisms. 

To test the changes, a test job is created. The logic of this test job is simple. The Source continuously sends data to Sink through an ALL-TO-ALL edge, no other computing logic is introduced, and pure Shuffle performance can be tested. Through shuffle job tests, the performance of streaming jobs and batch jobs is affected differently by all-buffer-floating. In the batch job scenario, when using all-buffer-floating, the performance does not fall back for the high parallelism jobs. The impact is greater when running lower parallelism, and the performance regression is within 4%. In the streaming job scenario, when using all-buffer-floating, the performance will be greatly affected, and even the performance regression will be 20%~50%. Because this is a pure shuffle test, the performance regression in the production scenario will be smaller than these test results.

Parallelism

Batch Jobs

Stream Jobs

<=500

Regression is within 4%

Regression is 20% ~ 50%

>500

No performance regression.

Based on the test results, this feature is opt-in for streaming jobs by setting the default value to Integer.MAX_VALUE, and the behavior of streaming jobs remains the same as it is now. For batch jobs, to ensure performance and reduce the probability of insufficient memory exceptions, a threshold, by default 1000 (2 * 500, ExclusiveBuffersPerChannel is 2 and numChannels is 500),  is introduced to control whether all-buffer-floating is used. When the total number of buffers used is less than the threshold, all-buffer-floating is not enabled for performance reasons. When the total number of buffers used is greater than the threshold, all buffers of the batch job will use Floating Buffers.

Config option

Config details

Reason

taskmanager.memory.network.read-buffer.required-per-gate.max

1. If this option is not configured, the default value for the Batch job is 1000, and the default value for the Stream job is Integer.MAX_VALUE.

2. If this option is configured, all jobs will take effect.

This config option is introduced to improve the usability of Flink, reduce the probability of insufficient network memory exceptions, and reduce the performance regression when all buffers use Floating Buffers(all-buffer-floating).

By default, most user jobs can run normally without configuring this config option.

According to our experimental experience, the old configs are adjusted only to avoid insufficient memory exceptions(set taskmanager.network.memory.buffers-per-channel to 0 and set taskmanager.network.memory.floating-buffers-per-gate to a large value). After the new configuration is introduced, it is no longer necessary to adjust the old configs, so the old configs can be marked as deprecated.

Flink network memory size

Config option

Change

Reason

taskmanager.memory.network.max

Set the default value to MemorySize.MAX_VALUE, which means that the maximum network memory size is no longer limited, and the network memory size can be increased by just increasing the total TM memory size.

For ease of use, the default value is modified to reduce the number of config options to be adjusted when adjusting the network memory size. For most cases, users only need to change the total TM memory size. This configuration option was not removed for forward compatibility.

During TPC-DS tests, the network memory default configuration may cause job failure due to insufficient memory when running in high parallelism(for example, 1000). After setting the default value of taskmanager.memory.network.read-buffer.required-per-gate.max to MemorySize.MAX_VALUE and introducing the optimization of Flink Shuffle reading buffers above, the total TPC-DS can run successfully. Moreover, no performance regression is observed in CI and NexBenchmark. If users still find a performance regression during use, the user can set taskmanager.memory.network.max to 1g to go back to the state before introducing the feature.


Compatibility, Deprecation, and Migration Plan

  1. When total network memory buffers in an InputGate exceed the configured threshold, all read buffers in InputChannels will change from Exclusive Buffer to Floating Buffer, which may have a certain impact on performance. If users want to go back to the state before introducing the feature, just add this config option, taskmanager.memory.network.read-buffer.required-per-gate.max: 2147483647 (Integer.MAX_VALUE).

  2. After default value of taskmanager.memory.network.max is modified, the TM network may occupy more memory in some cases than before, resulting in a smaller heap or managed memory. Users do not need to configure this option in most cases. If users want to go back to the state before introducing the feature, just add the config option, taskmanager.memory.network.max: 1g.


Test Plan

The change will be tested via Flink micro-benchmark (https://github.com/apache/flink-benchmarks), TPC-DS and nexmark (https://github.com/nexmark/nexmark) multiple times, indicating that the configuration takes effect and there is no performance regression by default.


Rejected Alternatives

  1. Directly modify the default value of the config options, that is, set taskmanager.network.memory.buffers-per-channel to 0, and set taskmanager.network.memory.floating-buffers-per-gate to 2 * numChannels + 8. The reason for rejection is that modifying the default value directly will affect the performance of streaming jobs.


  • No labels