Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

...

Page properties


Discussion thread

...

...

thread.html/r9ddc3bee9649012f91e58c5a4a9a985d2256dc608e8bc37c69e39e52%40%3Cdev.flink.apache.org%3E
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-23451

Release1.14

...

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The idea is to change the implementation in such a way that holds the exact amount of data in buffers which can be handled for the configured amount of time.

This should improve aligned checkpoints time (fewer in-flight data to process before checkpoint can complete) and improve the behaviour and performance of unaligned checkpoints (fewer in-flight data that needs to be persisted in every unaligned checkpoint). Potentially this can be an alternative to using unaligned checkpoints.

Public Interfaces

Configuration

...

  • The subtask calculate the throughput and then based on the configuration(timeInBufferQueue) calculate buffer size(bufferSize = throughput * timeInBufferQueue / totalNumberOfBuffers)
  • The subtask setup up the size of the bufferThe subtask observes the changes in the throughput and changes the information about buffer size during the whole life period of the task. But the subtask doesn't change the size physically. So the real size of the downstream buffer always remains as it was initially configured maximum size.
  • The subtask sends the new buffer size and number of available buffers to the upstream to the corresponding subpartition.
  • Upstream changes the size of the newly allocated buffer size corresponding to the received information but the currently filled buffers send as is whatever their current size is. (it is possible because the downstream buffer size wasn't changed and it is able to handle any size up to maximum).
  • Upstream sends the data and number of filled buffers to the downstream

...

It is not enough to know just the number of available buffers (credits) for the downstream because the size of these buffers can be different. So we are proposing to resolve this problem in the following way: If the desirable downstream buffer size is changed then the upstream should send the already filled buffer as is but the buffer which will be allocated after the new buffer size information was received should be of the size not greater than the new one regardless of how big the current buffer on the upstream. (pollBuffer should receive parameters like bufferSize and return buffer not greater than it. (The request of the buffer should contain new input parameter like bufferSize)

Different subpartitions with different desired buffer size

...