Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder DiscussionReleased

Discussion threadhttps://lists.apache.org/thread.html/r11750db945277d944f408eaebbbdc9d595d587fcfb67b015c716404e%40%3Cdev.flink.apache.org%3E

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19582
 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19614

Released: <Flink Version>1.13

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...


@Nullable
@Override
public BufferAndBacklog getNextBuffer() throws IOException;

@Override
public void notifyDataAvailable();

@Override
public void recycle(MemorySegment segment);

@Override
public void releaseAllResources();

@Override
public boolean isAvailable(int numCreditsAvailable);
}

MergePolicy: It is responsible for selecting the PartitionedFiles to be merged to one file.

public interface MergePolicy {

...


PartitionedFileMerger: It is responsible for merging the selected list of PartitionedFiles to be one file.

public interface PartitionedFileMerger {

   /**
* Merges the given list of {@link PartitionedFile} and generates a new merged {@link PartitionedFile}.
*/
PartitionedFile merge(List<PartitionedFile> mergeCandidates);
/**
* Returns the number of available {@link Buffer}s can be used by merge read.
*/
int numMergeBuffers();
/**
* Closes this {@link PartitionedFileMerger} and recycles all resources.
*/
void close();
}
The interface of SortBuffer and PartitionedFileMerger is flexible enough and new requirements like sort by record can The interface of SortBuffer and PartitionedFileMerger is flexible enough and new requirements like sort by record can be also implemented easily if needed.

...

If there are multiple disks, load balance is important for good performance. The simplest way to achieve load balance is rebalance disk selection.

For large scale batch jobs, a large number of network connections will be established, which may incur stability issues. We can restrict the number of concurrent partition requests to relieve the issue. Besides, restricting concurrent partition requests can increase the number of network buffers can be used per remote channel, that is, more credits per channel which is helpful for the shuffle reader to read sequentially. (As we mentioned above, the number of available credits can influence sequential read because we can not read more buffers than the consumer can process)

...

Implementing a stand-alone shuffle service can further improve the shuffle IO performance because it is a centralized service and can collect more information which can lead to more optimized actions. For example, better node-level load balance, better disk-level load balance, further file merging, node-level IO scheduling and shared read/write buffer and thread pool. It can be introduced in a separated FLIP.

...