Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-

...

27530

Release1.16


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

As mentioned by FLINK-14396: As long as there is at-least one available buffer in LocalBufferPool, the RecordWriter is available for network output in most cases.  So it can only solve the scenario where only one buffer is needed to process a single record. When the back pressure is severe, if multiple output buffers are required to process a single record, the Task may still be blocked on requestMemory, resulting in Checkpoint not being able to complete quickly. For example:

  • Big record which might span multiple buffers
  • Flatmap-like operators which might emit multiple records in every process
  • Broadcast watermark which might request multiple buffers at a time

In this FLIP, we propose to add the overdraft buffer in order to reduce the probability of Task being blocked in requestMemory when multiple output buffers are required to process a single record.

Overdraft Buffer mechanism: When LocalBufferPool#requestMemory is called and LocalBufferPool is insufficient, LocalBufferPool will allow Task to overdraw some MemorySegments and LocalBufferPool will not be available. The LocalBufferPool cannot become available until all the overdraft buffers are consumed by downstream tasks and the LocalBufferPool has recycled these overdraft buffers.

Public Interfaces


Proposed Changes

We will split this JIRA into 3 tasks.

The first task: Ignore max buffers per channel when allocate buffer

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27522

The LocalBufferPool will be unavailable when the maxBuffersPerChannel is reached for this channel or availableMemorySegments.isEmpty.

If we request a memory segment from LocalBufferPool and the maxBuffersPerChannel is reached for this channel, we just ignore that and continue to allocate buffer while availableMemorySegments isn't empty in LocalBufferPool.

The second task: Adding the overdraft buffer

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-26762

Adding the new configuration : taskmanager.network.memory.max-overdraft-buffers-per-gate, the default value is 5.

The LocalBufferPool will be unavailable when the maxBuffersPerChannel is reached for this channel or availableMemorySegments.isEmpty or numberOfRequestedOverdraftMemorySegments > 0.

If we request a memory segment, we try to allocate buffer from availableMemorySegments. If availableMemorySegments.isEmpty numberOfRequestedOverdraftMemorySegments < maxOverdraftBuffersPerGate, we will request the overdraftBuffer from NetworkBufferPool.

For return memory segment, we will return the overdraft first if numberOfRequestedOverdraftMemorySegments > 0.

The third task: Compatible with LegacySource

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27789

Since LegacySource does not have checkAvailable, LegacySource will use all overdraft buffers by default, this is not what we expected.

So we'll set overdraft=0 for the SourceStreamTask.

Compatibility, Deprecation, and Migration Plan

...

  • Test for apply for overdraft buffer when overdraft buffer is sufficient
  • Test for apply for overdraft buffer when overdraft buffer is insufficient
  • Checkpoint Duration Benchmark for enable overdraft buffer 

Rejected Alternatives

After discussing, we decided to use overdraft-buffer instead of reserve-buffer. For details, please refer to the mail list.