Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Each result partition holds a SortBuffer, serialized records and events will be appended to the SortBuffer until the it is full or EOF reached.
  2. Then the PartitionedFileWriter will spill all data in the SortBuffer as one PartitionedFile in subpartition index order and at the same time partition offset information will be also saved.
  3. MergePolicy will collect information of all spilled PartitionedFiles and select a subset or all files to be merged according to the number of files and the file size.
  4. PartitionedFileMerger then merges all the selected PartitionedFiles into one PartitionedFile.
  5. After the SortMergeResultPartition is finished, the consumer task can request the partition data, a SortMergePartitionReader will be created to read the corresponding data.
  1. The IO scheduler will schedule all the shuffle data reads in IO friendly order, i.e. reading shuffle data file sequentially.

SortBuffer: Data of different channels can SortBuffer: Data of different channels can be appended to a SortBuffer and after the SortBuffer is finished, the appended data can be copied from it in channel index order.

public interface SortBuffer {

/**
* Appends data of the specified channel to this {@link SortBuffer} and returns true if all
bytes of
* bytes *of the source buffer is copied to this {
@link SortBuffer} successfully, otherwise if
returns false,
* *returns false, nothing will be copied.
*/
boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType)
throws IOException;
/**
throws IOException;

/**
* Copies data in this {@link SortBuffer} to the target {@link MemorySegment} in channel index
order
* order and returns {
@link BufferWithChannel} which contains the copied data and the
corresponding channel
* corresponding *channel index.
*/
BufferWithChannel copyDatacopyIntoSegment(MemorySegment target);

/**
* Returns the number of records written to this {
@link SortBuffer}.
*/
long numRecords();

/**
* Returns the number of bytes written to this {
@link SortBuffer}.
*/
long numBytes();

/**
* Returns true if there is still data can be consumed in this {
@link SortBuffer}.
*/
boolean hasRemaining();

/**
* Finishes this {
@link SortBuffer} which means no record can be appended any more.
*/
void finish();

/**
* Releases Whether this {
@link SortBuffer} whichis releasesfinished allor resourcesnot.
*/
void boolean releaseisFinished();
}

PartitionedFile: Persistent file type of SortMergeResultPartition and it stores data of all subpartitions in subpartition index order.

...


/** Releases this {@link SortBuffer} which releases all resources. */
public Pathvoid getDataFilerelease();

/**
Whether this {
@link *SortBuffer} Returnsis thereleased startingor offset of the given subpartition in this {@link PartitionedFile}.
*/
not. */
boolean isReleased();
}

PartitionedFile: Persistent file type of SortMergeResultPartition and it stores data of all subpartitions in subpartition index order.

public class PartitionedFile {

public longPath getStartingOffsetgetDataFilePath(int subpartitionIndex);

/**
public Path getIndexFilePath();

* Returns the numberpublic of buffers of the given subpartition in this {@link PartitionedFile}.
*/
public int getNumBuffers(int subpartitionIndex);
public void deleteQuietly();
}

PartitionedFileWriter: File writer to write buffers to PartitionedFile in subpartition order.

public class PartitionedFileWriter {

   /**
int getNumRegions();

/**
* Gets the index entry of the target region and subpartition either from the index data cache
* Opensor athe {@linkindex PartitionedFile} for writingdata file.
*/
public void open() throws IOException;
/**
getIndexEntry(FileChannel indexFile, ByteBuffer target, int region, int subpartition)
* Writes a {@link Buffer} of thethrows givenIOException;

subpartition to the openedpublic {void deleteQuietly();
}

PartitionedFileWriter: File writer to write buffers to PartitionedFile in subpartition order.

public class PartitionedFileWriter implements AutoCloseable {@link PartitionedFile}.
*

/**
<p>Note: The caller is responsible* for recyclingPersists the targetregion bufferindex and releasingof the failed
current data region and *starts {@linka PartitionedFile}new ifregion anyto exception occurswrite.
*/
public void writeBuffer(Buffer target, int subpartitionIndex) throws IOException;
/**
* Finishes the current* <p>Note: The caller is responsible for releasing the failed {@link PartitionedFile} which closes the file channel and constructs
if any
* the corresponding {@link PartitionedFile.PartitionedFileIndex}exception occurs.
*
* <p>Note:@param TheisBroadcastRegion callerWhether isit's responsiblea forbroadcast releasingregion. theSee failed {@link PartitionedFile} if any
* exception occurs.
#isBroadcastRegion}.
*/
public PartitionedFilevoid finishstartNewRegion(boolean isBroadcastRegion) throws IOException;

/**
* UsedWrites toa closelist and delete the failed of {@link PartitionedFileBuffer}s whento anythis exception{@link occursPartitionedFile}.
It guarantees that */after
public void releaseQuietly();
}

PartitionedFileReader: Reader which can read all data of the target subpartition from a PartitionedFile.

public class PartitionedFileReader implements AutoCloseable {

   /**
* Opens the given {@link PartitionedFile} and moves read position to the starting offset of the
* target subpartition.
* the return of this method, the target buffers can be released. In a data region, all data of
* the same subpartition must be written together.
*/
public void open() throws IOException;
/**
* Reads a buffer from the {@link PartitionedFile} and moves the read position forward.
*
* <p>Note: The caller is responsible for recycling the target buffers and releasing the failed
* <p>Note: The caller is responsible for recycling the target buffer {@link PartitionedFile} if any exception occurs.
*/
@Nullable
public Buffervoid readBufferwriteBuffers(MemorySegment target, BufferRecycler recyclerList<BufferWithChannel> bufferWithChannels) throws IOException;

public boolean hasRemaining(); /**
@Override
* publicFinishes voidwriting close() throws IOException;
}

SortMergeResultPartition: Entry point of sort-merge based blocking shuffle. (Override methods are inherited from ResultPartition)

the {@link PartitionedFile} which closes the file channel and returns the
* corresponding {@link PartitionedFile}.
*
* <p>Note: The caller is responsible for releasing the failed {@link PartitionedFile} if any
* exception occurs.
*/
public PartitionedFile finish() throws IOException;

/** Used to close and delete the failed {@link PartitionedFile} when any exception occurs. */
public void releaseQuietly();

@Override
public void close() throws IOException;
}

PartitionedFileReader: Reader which can read all data of the target subpartition from a PartitionedFile.

class PartitionedFileReader {

/**
* Reads a buffer from the current region of the target {@link PartitionedFile} and moves the
* read position forward.
*
* <p>Note: The caller is responsible for recycling the target buffer if any exception occurs.
*
* @param target The target {@link MemorySegment} to read data to.
* @param recycler The {@link BufferRecycler} which is responsible to recycle the target buffer.
* @return A {@link Buffer} containing the data read.
*/
@Nullable
public Buffer readCurrentRegion(MemorySegment target, BufferRecycler recycler) throws IOException;

public boolean hasRemaining() throws IOException;

/** Gets read priority of this file reader. Smaller value indicates higher priority. */
public long getPriority();
}

SortMergeResultPartition: Entry point of sort-merge based blocking shuffle. (Override methods are inherited from ResultPartition)

public class SortMergeResultPartition extends ResultPartition {

@Override
public void setup() throws IOException;

@Override
protected void releaseInternal();

@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException;

@Override
public void broadcastRecord(ByteBuffer record) throws IOException;

@Override
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException;

/**
* Spills the large record into the target {@link PartitionedFile} as a separate data region.
*/
private void writeLargeRecord(
public class SortMergeResultPartition extends ResultPartition {
@Override
protected void releaseInternal();
@Override
public void emitRecord(ByteBuffer record, int targetSubpartition), throws IOException;
@Override
public void broadcastRecord(ByteBuffer record) throws IOException;DataType dataType, boolean isBroadcast)
@Override
public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException;

/**@Override
*public Spillsvoid thefinish() largethrows recordIOException;

into a separate {@link PartitionedFile}. @Override
*/
privatepublic void writeLargeRecordclose();

@Override
public ResultSubpartitionView createSubpartitionView(
ByteBuffer record, int targetSubpartition, DataType dataType) throws IOException;
int void releaseReader(SortMergePartitionReader reader);subpartitionIndex, BufferAvailabilityListener availabilityListener)
@Override
public void finish() throws IOException;

@Override
public void closeflushAll();

@Override
public ResultSubpartitionViewvoid createSubpartitionView(
flush(int subpartitionIndex, BufferAvailabilityListener listener) throws IOException;

@Override
public voidCompletableFuture<?> flushAllgetAvailableFuture();

@Override
public voidint flushgetNumberOfQueuedBuffers(int subpartitionIndex);

@Override
public CompletableFuture<?>int getAvailableFuturegetNumberOfQueuedBuffers(int targetSubpartition);
}

SortMergePartitionReader: Subpartition data reader for link SortMergeResultPartition. (Override methods are inherited from ResultSubpartitionView and BufferRecycler)

...


@Nullable
@Override
public BufferAndBacklog getNextBuffer() throws IOException;

@Override
public void notifyDataAvailable();

@Override
public void recycle(MemorySegment segment);

@Override
public void releaseAllResources();

@Override
public boolean isAvailable(int numCreditsAvailable);
}

The interface of SortBuffer and PartitionedFileMerger is flexible enough and new requirements like sorting by record can be also implemented easily if needed.

...

Basic shuffle logic and data compression will be implemented first, which can make the sort-merge based blocking shuffle available for usage. Main components include 1) SortBuffer and a hash-based data clustering implementation; 2) PartitionedFile together with the corresponding writer (PartitionedFileWriter) and reader (PartitionedFileReader); 3) SortMergeResultPartition and the subpartition data reader SortMergePartitionReader. We will introduce this components separately. For data compression, by reusing the facilities implemented for the existing BoundedBlockingResultPartition, only very small change is needed. Tests will include both unit tests, IT cases and real job test on a cluster. Besides, IO scheduling can improve the shuffle performance a lot so we also need to implement it.

File merge IO scheduling and other optimizations can be implemented as the second step. Main components include MergePolicy, PartitionedFileMerge, IOScheduler and PartitionRequestManagerIO scheduler and buffer pool for batch shuffle. Tests will include both unit tests, IT cases and real job test on a cluster.

...