...
Page properties | |||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). |
...
- Add a new config option.
- Key
- taskmanager.network.memory.network.read-buffer.required-per-buffergate.max
- Default
- (none).
- Type
- Integer.
- Description
- The maximum number of network read buffers that are required by an input gate. (An input gate is responsible for reading data from all subtasks of an upstream task.) The number of buffers needed by an input gate is dynamically calculated in runtime, depending on various factors (e.g., the parallelism of the upstream task). Among the calculated number of needed buffers, the part below this configured value is required, while the excess part, if any, is optional. A task will fail if the required buffers cannot be obtained in runtime. A task will not fail due to not obtaining optional buffers, but may suffer a performance reduction. If not explicitly configured, the default value is Integer.MAX_VALUE for streaming workloads, and 1000 for batch workloads.
- In the first version, this new config will be annotated as @Experimental. We will revisit and try to remove the annotation in the next version.
- Key
- Deprecated 2 existing config options
- taskmanager.network.memory.buffers-per-channel
- taskmanager.network.memory.floating-buffers-per-gate
- We will not annotate these 2 configs as @Deprecated until the @Experimental annotation for taskmanager.network.memory.network.read-buffer.required-bufferper-gate.max is removed.
- Modify the default value of taskmanager.memory.network.max from 1g to MemorySize.MAX_VALUE.
...
When ExclusiveBuffersPerChannel * numChannels < <= numFloatingBufferThreashold,
All read buffers in InputGate contain Floating Buffers and Exclusive Buffers.
ExclusiveBuffersPerChannel is 2 by default, which is consistent with the current state.
FloatingBuffersPerGate is within the range of [1, DefaultFloatingBuffersPerGate] by default, consistent with the current state.
When ExclusiveBuffersPerChannel * numChannels > = numFloatingBufferThreashold,numFloatingBufferThreashold,
- When the total exclusive buffers to use reach the threshold, the number of exclusive buffers per channel is gradually reduced in a fine-grained manner, rather than directly reducing the number of exclusive buffers to 0, the process is as follows.
Set the number of exclusive buffers according to the method of gradually decreasing from ExclusiveBuffersPerChannel to 1 until ExclusiveBuffersPerChannel * numChannels<=numFloatingBufferThread, and the remaining buffers are floating. If the number of exclusive buffers is 1 and the total number of exclusive buffers is still greater than numFloatingBufferThread, set the number of exclusive buffers per channel as 0 to use All read buffers in InputGate use Floating Buffers ( all-buffer-floating).
ExclusiveBuffersPerChannel is 0.
When using all-buffer-floating, FloatingBuffersPerGate is within the range of [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels + DefaultFloatingBuffersPerGate], which can ensure that the required size of Shuffle read memory is decoupled from the parallelisms.
...
Config option | Config details | Reason |
---|---|---|
taskmanager.network.memory.network.read-buffer.required-bufferper-gate.max | 1. If this option is not configured, the default value for the Batch job is 1000, and the default value for the Stream job is Integer.MAX_VALUE. 2. If this option is configured, all jobs will take effect. | This config option is introduced to improve the usability of Flink, reduce the probability of insufficient network memory exceptions, and reduce the performance regression when all buffers use Floating Buffers(all-buffer-floating). |
...
During TPC-DS tests, the network memory default configuration may cause job failure due to insufficient memory when running in high parallelism(for example, 1000). After setting the default value of taskmanager.network.memory.network.read-buffer.required-per-buffergate.max to MemorySize.MAX_VALUE and introducing the optimization of Flink Shuffle reading buffers above, the total TPC-DS can run successfully. Moreover, no performance regression is observed in CI and NexBenchmark. If users still find a performance regression during use, the user can set taskmanager.memory.network.max to 1g to go back to the state before introducing the feature.
...
When total network memory buffers in an InputGate exceed the configured threshold, all read buffers in InputChannels will change from Exclusive Buffer to Floating Buffer, which may have a certain impact on performance. If users want to go back to the state before introducing the feature, just add this config option, taskmanager.network.memory.network.read-buffer.required-per-buffergate.max: 2147483647 (Integer.MAX_VALUE).
After default value of taskmanager.memory.network.max is modified, the TM network may occupy more memory in some cases than before, resulting in a smaller heap or managed memory. Users do not need to configure this option in most cases. If users want to go back to the state before introducing the feature, just add the config option, taskmanager.memory.network.max: 1g.
...