Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19582
 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19614

...

Release1.13


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Hash-based blocking shuffle and sort-merge based blocking shuffle are two main blocking shuffle implementations wildly adopted by existing distributed data processing frameworks. Hash-based implementation writes data sent to different reducer tasks into separate files concurrently while sort-merge based approach writes those data together into a single file and merges those small files into bigger ones. Compared to sort-merge based approach, hash-based approach has several weak points when it comes to running large scale batch jobs:

  1. Stability: For high parallelism (tens of thousands) batch job, current hash-based blocking shuffle implementation writes too many files concurrently which gives high pressure to the file system, for example, maintenance of too many file metas, exhaustion of inodes or file descriptors. All of these can be potential stability issues. Sort-Merge based blocking shuffle don’t have the problem because for one result partition, only one file is written at the same time.
  2. Performance: Large amounts of small shuffle files and random IO can influence shuffle performance a lot especially for hdd (for ssd, sequential read is also important because of read ahead and cache). For batch jobs processing massive data, small amount of HDD. For batch jobs processing massive data, small amount of data per subpartition is common because of high parallelism. Besides, data skew is another cause of small subpartition files. By merging writing data of all subpartitions together in one file and leveraging IO scheduling, more sequential read can be achieved.
  3. Resource: For current hash-based implementation, each subpartition needs at least one buffer. For large scale batch shuffles, the memory consumption can be huge. For example, we need at least 320M network memory per result partition if parallelism is set to 10000 and because of the huge network consumption, it is hard to config the network memory for large scale batch job and  sometimes parallelism can not be increased just because of insufficient network memory  which leads to bad user experience.

By introducing the sort-merge based approach blocking shuffle implementation to Flink, we can improve Flink’s capability of running large scale batch jobs.

Public Interfaces

Several new config options will be added to control the behavior of the sort-merge based blocking shuffle and by disable sort-merge based blocking shuffle by default, the default behavior of blocking shuffle stays unchanged.

Config OptionDescription
taskmanager.network.sort
-merge-blocking
-shuffle.
max-files-per-partitionThe maximum number of files can be produced by each
min-buffers
Minimum number of network buffers required per sort-merge blocking
partition, files over this threshold will be merged
 result partition.
taskmanager.network.sort
-merge-blocking
-shuffle.
buffers
min-
per-partitionNumber of network buffers required for each taskmanager.network.sort-merge-blocking-shuffle.min-parallelismFor small parallelism, hash-based blocking shuffle will be used and for large parallelism, sort-merge based blocking shuffle will be used
parallelism
Parallelism threshold to switch between sort-merge
blocking result partition. Larger value can reduce the number of shuffle files and bring better performance.
 based blocking shuffle and the default hash-based blocking shuffle.
taskmanager.memory.framework.off-heap.batch-shuffle.size
Size of direct memory used by blocking shuffle for shuffle data read
.

A fixed number of network buffers per result partition makes the memory consumption decoupled with parallelism which is more friendly for large scale batch jobs.

Proposed Changes

Image Removed

Image Added

  1. Each result partition holds a We have SortBuffer, serialized records and events will be appended to the SortBuffer until the it is full or EOF reached.
  2. Then the PartitionedFileWriter will spill all data in the SortBuffer as one PartitionedFile in subpartition index order and at the same time partition offset information will be also saved.
  3. MergePolicy will collect information of all spilled PartitionedFiles and select a subset or all files to be merged according to the number of files and the file size.
  4. PartitionedFileMerger then merges all the selected PartitionedFiles into one PartitionedFile.
  5. After the SortMergeResultPartition is finished, the consumer task can request the partition data, a SortMergePartitionReader will be created to read the corresponding data.
  1. The IO scheduler will schedule all the shuffle data reads in IO friendly order, i.e. reading shuffle data file sequentially.

SortBuffer: Data SortBuffer: Data of different channels can be appended to a SortBuffer and after the SortBuffer is finished, the appended data can be copied from it in channel index order.

public interface SortBuffer {

/**
* Appends data of the specified channel to this {@link SortBuffer} and returns true if all
bytes of
* bytes *of the source buffer is copied to this {
@link SortBuffer} successfully, otherwise if
returns false,
* returns *false, nothing will be copied.
*/
boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType)
throws IOException;
/**
* Copies throws IOException;

/**
* Copies data in this {@link SortBuffer} to the target {@link MemorySegment} in channel index
order
* order and returns {
@link BufferWithChannel} which contains the copied data and the
corresponding channel
* corresponding *channel index.
*/
BufferWithChannel copyDatacopyIntoSegment(MemorySegment target);

/**
* Returns the number of records written to this {
@link SortBuffer}.
*/
long numRecords();

/**
* Returns the number of bytes written to this {
@link SortBuffer}.
*/
long numBytes();

/**
* Returns true if there is still data can be consumed in this {
@link SortBuffer}.
*/
boolean hasRemaining();

/**
* Finishes this {
@link SortBuffer} which means no record can be appended any more.
*/
void finish();

/** Whether this {@link SortBuffer} is finished or not. */
boolean isFinished();

/** Releases this {@link SortBuffer} which releases all resources. */
void release();

/** Whether this {@link SortBuffer} is released or not. */
voidboolean releaseisReleased();
}

PartitionedFile: Persistent file type of SortMergeResultPartition and it stores data of all subpartitions in subpartition index order.

public class PartitionedFile {

public Path getDataFilegetDataFilePath();

/**
public Path getIndexFilePath();

* Returns the startingpublic offset of the given subpartition in this {@link PartitionedFile}.
*/
public long getStartingOffset(int subpartitionIndex);
/**
* Returns the number of buffers of the given subpartition in this {@link PartitionedFile}.
int getNumRegions();

/**
* Gets the index entry of the target region and subpartition either from the index data cache
* or the index data file.
*/
public void getIndexEntry(FileChannel indexFile, ByteBuffer target, int region, getNumBuffers(int subpartitionIndex)subpartition)
throws IOException;

public void deleteQuietly();
}

PartitionedFileWriter: File writer to write buffers to PartitionedFile in subpartition order.

public class PartitionedFileWriter implements AutoCloseable {

/**
* OpensPersists the aregion {@linkindex PartitionedFile}of forthe writing.
current data region and */
starts a new
publicregion void open() throws IOException;
to write.
/**
* Writes a {@link Buffer} of <p>Note: The caller is responsible for releasing the given subpartition to the opened failed {@link PartitionedFile}. if any
*
exception occurs.
* <p>Note: The caller*
is responsible for recycling the* target@param bufferisBroadcastRegion andWhether releasingit's thea failed
broadcast region. *See {
@link PartitionedFile} if any exception occurs#isBroadcastRegion}.
*/
public void writeBufferstartNewRegion(Buffer target, int subpartitionIndexboolean isBroadcastRegion) throws IOException;

/**
* Writes Finishesa thelist currentof {@link PartitionedFileBuffer}s to whichthis closes the file channel and constructs
{
@link PartitionedFile}. It guarantees that after
* the correspondingreturn {@link PartitionedFile.PartitionedFileIndex}.
*
* <p>Note: The caller is responsible for releasing the failed {@link PartitionedFile} if any
* exception occurs.
of this method, the target buffers can be released. In a data region, all data of
* the same subpartition must be written together.
*/
public PartitionedFile finish() throws IOException;
/**
* Used to close and delete the failed* <p>Note: The caller is responsible for recycling the target buffers and releasing the failed
* {@link PartitionedFile} whenif any exception occurs.
*/
public void releaseQuietlywriteBuffers(List<BufferWithChannel> bufferWithChannels);
}

PartitionedFileReader: Reader which can read all data of the target subpartition from a PartitionedFile.

public class PartitionedFileReader implements AutoCloseable {

 throws IOException;

/**
* Finishes Openswriting the given {@link PartitionedFile} andwhich movescloses readthe positionfile tochannel the starting offset ofand returns the
* corresponding target{@link subpartitionPartitionedFile}.
*/
public void open() throws IOException;
/**
* Reads a buffer from the {@link PartitionedFile} and moves the read position forward.
* <p>Note: The caller is responsible for releasing the failed {@link PartitionedFile} if any
* exception occurs.
*/
*public <p>Note: The caller is responsible for recycling the target buffer if any exception occurs.
*/
@Nullable
PartitionedFile finish() throws IOException;

/** Used to close and delete the failed {@link PartitionedFile} when any exception occurs. */
public Buffervoid readBuffer(MemorySegment target, BufferRecycler recycler) throws IOExceptionreleaseQuietly();

public boolean hasRemaining();
@Override
@Override
public void close() throws IOException;
}

SortMergeResultPartitionPartitionedFileReaderEntry point of sort-merge based blocking shuffle. (Override methods are inherited from ResultPartition)Reader which can read all data of the target subpartition from a PartitionedFile.

class PartitionedFileReader {

public class SortMergeResultPartition extends ResultPartition {
@Override/**
protected void releaseInternal();
* Reads a @Override
buffer from the publiccurrent voidregion emitRecord(ByteBuffer record, int targetSubpartition) throws IOException;
@Overrideof the target {@link PartitionedFile} and moves the
public void broadcastRecord(ByteBuffer record) throws IOException;
* read position forward.
@Override*
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException;
/**
* Spills the large record into a separate {@link PartitionedFile}.
* <p>Note: The caller is responsible for recycling the target buffer if any exception occurs.
*
*/
@param target The privatetarget void writeLargeRecord(
{@link MemorySegment} to read data to.
* @param ByteBufferrecycler record,The int {@link targetSubpartition, DataType dataType) throws IOException;
void releaseReader(SortMergePartitionReader reader);BufferRecycler} which is responsible to recycle the target buffer.
@Override
* public@return void finish() throws IOException;
@OverrideA {@link Buffer} containing the data read.
public void close();
*/
@Nullable@Override
public ResultSubpartitionViewBuffer createSubpartitionViewreadCurrentRegion(
MemorySegment target, BufferRecycler recycler) throws IOException;

public int subpartitionIndex, BufferAvailabilityListener listenerboolean hasRemaining() throws IOException;

@Override
/** Gets publicread void flushAll();
@Override
public void flush(int subpartitionIndex);
@Override
priority of this file reader. Smaller value indicates higher priority. */
public CompletableFuture<?>long getAvailableFuturegetPriority();
}

SortMergePartitionReader: Subpartition data reader for link SortMergeResultPartitionSortMergeResultPartition: Entry point of sort-merge based blocking shuffle. (Override methods are inherited from ResultSubpartitionView and BufferRecyclerResultPartition)

public

...

   @Nullable
class SortMergeResultPartition extends ResultPartition {

@Override
public BufferAndBacklogvoid getNextBuffersetup() throws IOException;

@Override
publicprotected void notifyDataAvailablereleaseInternal();

@Override
public void recycle(MemorySegment segment);
emitRecord(ByteBuffer record, int targetSubpartition) throws IOException;

@Override
public void releaseAllResourcesbroadcastRecord(ByteBuffer record);
throws IOException;

@Override
public booleanvoid isAvailable(int numCreditsAvailable);
}

MergePolicy: It is responsible for selecting the PartitionedFiles to be merged to one file.

public interface MergePolicy {

broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException;

/**
* NotifiesSpills the new generated large record into the target {@link PartitionedFile} toas a theseparate {@linkdata MergePolicy}region.
*/
private void addPartitionedFilewriteLargeRecord(PartitionedFile
file);
/**
* Notifies that the {@link ResultPartitionWriter} has been finished which means noByteBuffer record, int targetSubpartition, DataType dataType, boolean isBroadcast)
* more {@link PartitionedFile} except for the merged onesthrows willIOException;

be added.
@Override
*/
public void notifyPartitionFinishedfinish() throws IOException;

/**@Override
*public Selects a list of {@link PartitionedFile} candidates to be merged to one file.void close();

@Override
public ResultSubpartitionView createSubpartitionView(
*/
List<PartitionedFile> selectMergeCandidates(int numMergeBuffers);
int subpartitionIndex, BufferAvailabilityListener availabilityListener)
/**
* Gets all {@link PartitionedFile}s.throws IOException;

*/@Override
public List<PartitionedFile>void getPartitionedFileListflushAll();
}

PartitionedFileMerger: It is responsible for merging the selected list of PartitionedFiles to be one file.

public interface PartitionedFileMerger {

...


@Override
public void flush(int subpartitionIndex);

@Override
public CompletableFuture<?> getAvailableFuture();

@Override
public int getNumberOfQueuedBuffers();

@Override
public int getNumberOfQueuedBuffers(int targetSubpartition);
}

SortMergePartitionReader: Subpartition data reader for SortMergeResultPartition. (Override methods are mainly inherited from ResultSubpartitionView)


public class SortMergeSubpartitionReader
implements ResultSubpartitionView, Comparable<SortMergeSubpartitionReader> {

@Nullable
@Override
public BufferAndBacklog getNextBuffer();

/** This method is called by the IO thread of {@link SortMergeResultPartitionReadScheduler}. */
public boolean readBuffers(Queue<MemorySegment> buffers, BufferRecycler recycler) throws IOException;

public CompletableFuture<?> getReleaseFuture();

public void fail(Throwable throwable);

@Override
public void notifyDataAvailable();

@Override
public int compareTo(SortMergeSubpartitionReader that);

@Override
public void releaseAllResources();

@Override
public boolean isReleased();

@Override
public void resumeConsumption();

@Override
public Throwable getFailureCause();

@Override
public boolean isAvailable(int numCreditsAvailable);

@Override
public int unsynchronizedGetNumberOfQueuedBuffers();
}

The interface of SortBuffer is flexible enough and new requirements like sorting by record can be also implemented easily if needed.

Further

...

Optimization

As we discussed above, writing data of all subpartitions together in one file makes it more friendly for sequential read and write which can already improve the IO performance a lot. Besides, we can even further improve the IO performance by scheduling the reading and writing IO requests (especially helpful for reading). When shuffling data, the sequential read is restricted by the amount of data of each subpartition, the size of the read buffer and the available credits of the consumer task. The data read pattern can be summarized as reading a chunk of data from different subpartitions in parallel. After data of all subpartitions is spilled to one file in subpartition index order, we can rearrange the data read requests and always serve the data in subpartition order and read as much data in one request. By scheduling the read requests, more sequential reads can be achieved and in the best cases, a data file can be read totally in a sequential way.

Data compression has been implemented for the default hash-based blocking shuffle, which improves the tpcTPC-ds DS benchmark performance by about 30%. We can also implement data compression for sort-merge based blocking shuffle.

For the result partition using the broadcast partitioner, we can copy the serialized record only once to the SortBuffer and write only one copy of the data to disk which can reduce CPU usage and file IO a lot.

If there are multiple disks, load balance is important for good performance. The simplest way to achieve load balance is rebalance disk selection.

For large scale batch jobs, a large number of network connections will be established, which may incur stability issues. We can restrict the number of concurrent partition requests to relieve the issue. Besides, restricting concurrent partition requests can increase the number of network buffers can be used per remote channel, that is, more credits per channel which is helpful for the shuffle reader to read sequentially. (As we mentioned above, the number of available credits can influence sequential read because we can not read more buffers than the consumer can process)can restrict the number of concurrent partition requests to relieve the issue. Besides, restricting concurrent partition requests can increase the number of network buffers can be used per remote channel, that is, more credits per channel which is helpful for the shuffle reader to read sequentially. (As we mentioned above, the number of available credits can influence sequential read because we can not read more buffers than the consumer can process)

Implementing a stand-alone shuffle service can further improve the shuffle IO performance because it is a centralized service and can collect more information which can lead to more optimized actions. For example, better node-level load balance, better disk-level load balance, further file merging, node-level IO scheduling and shared read/write buffer and thread pool. It can be introduced in a separated FLIP.

Implementation and Test Plan

Basic shuffle logic and data compression will be implemented first, which can make the sort-merge based blocking shuffle available for usage. Main components include 1) SortBuffer and a hash-based data clustering implementation; 2) PartitionedFile together with the corresponding writer (PartitionedFileWriter) and reader (PartitionedFileReader); 3) SortMergeResultPartition and the subpartition data reader SortMergePartitionReader. We will introduce this components separately. For data compression, by reusing the facilities implemented for the existing BoundedBlockingResultPartition, only very small change is needed. Tests will include both unit tests, IT cases and real job test on a cluster.

...

File merge IO scheduling and other optimizations can be implemented as the second step. Main components include MergePolicy, PartitionedFileMerge, IOScheduler and PartitionRequestManagerIO scheduler and buffer pool for batch shuffle. Tests will include both unit tests, IT cases and real job test on a cluster.

Compatibility, Deprecation, and Migration Plan

The default behavior of Flink stays unchanged. Nothing need to do when migrating to new Flink version.

Appendix

Our goal is to cluster data belonging to the same subpartition together and sort is a nature approach. However, we do not need a generic sort implementation. Given that the subpartition index is a sequence of continuous integers from 0, bucket sort combining linked list is a simpler and more efficient way. Each subpartition takes a bucket and each bucket points to the first record in the binary SortBuffer. Each record also has a pointer pointing to the next record belonging to the same subpartition. The following picture shows how it works:

...