Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19582
 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19614

...

Release1.13


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. Each result partition holds a SortBuffer, serialized records and events will be appended to the SortBuffer until the it is full or EOF reached.
  2. Then the PartitionedFileWriter will spill all data in the SortBuffer as one PartitionedFile in subpartition index order and at the same time partition offset information will be also saved.
  3. MergePolicy will collect information of all spilled PartitionedFiles and select a subset or all files to be merged according to the number of files and the file size.
  4. PartitionedFileMerger then merges all the selected PartitionedFiles into one PartitionedFile.
  5. After the SortMergeResultPartition is finished, the consumer task can request the After the SortMergeResultPartition is finished, the consumer task can request the partition data, a SortMergePartitionReader will be created to read the corresponding data.
  6. The IO scheduler will schedule all the shuffle data reads in IO friendly order, i.e. reading shuffle data file sequentially.

SortBuffer: Data of different channels can be appended to a SortBuffer and after the SortBuffer is finished, the appended data can be copied from it in channel index order.

public interface SortBuffer {

/**
* Appends data of the specified channel to this {@link SortBuffer} and returns true if all
bytes of
* bytes *of the source buffer is copied to this {
@link SortBuffer} successfully, otherwise if
returns false,
* returns *false, nothing will be copied.
*/
boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType)
throws IOException;

/**
* Copies data in this {@link SortBuffer} to the target {@link MemorySegment} in channel index
order
* order and returns {
@link BufferWithChannel} which contains the copied data and the
corresponding channel
* corresponding *channel index.
*/
BufferWithChannel copyDatacopyIntoSegment(MemorySegment target);

/**
* Returns the number of records written to this {
@link SortBuffer}.
*/
long numRecords();

/**
* Returns the number of bytes written to this {
@link SortBuffer}.
*/
long numBytes();

/**
* Returns true if there is still data can be consumed in this {
@link SortBuffer}.
*/
boolean hasRemaining();

/**
* Finishes this {
@link SortBuffer} which means no record can be appended any more.
*/
void finish();

/**
* Releases Whether this {
@link SortBuffer} whichis releasesfinished allor resourcesnot.
*/
voidboolean releaseisFinished();
}

PartitionedFile: Persistent file type of SortMergeResultPartition and it stores data of all subpartitions in subpartition index order.

public class PartitionedFile {


public Path getDataFile /** Releases this {@link SortBuffer} which releases all resources. */
void release();

/**
Whether this {
@link *SortBuffer} Returnsis thereleased startingor offset of the given subpartition in this {@link PartitionedFile}.
*/
not. */
boolean isReleased();
}

PartitionedFile: Persistent file type of SortMergeResultPartition and it stores data of all subpartitions in subpartition index order.

public class PartitionedFile {

public long getStartingOffset(int subpartitionIndexPath getDataFilePath();

public Path getIndexFilePath();

public int getNumRegions();

/**
* ReturnsGets the numberindex of buffersentry of the giventarget subpartitionregion inand thissubpartition {@link PartitionedFile}.
either from the index data cache */
public int* getNumBuffers(int subpartitionIndex);
public void deleteQuietly();
}

PartitionedFileWriter: File writer to write buffers to PartitionedFile in subpartition order.

public class PartitionedFileWriter {

or the index data file.
*/ /**
*public Opensvoid a {@link PartitionedFile} for writing.
getIndexEntry(FileChannel indexFile, ByteBuffer target, int region, int subpartition)
throws IOException; */

public void opendeleteQuietly();
}

PartitionedFileWriter: File writer to write buffers to PartitionedFile in subpartition order.

public class PartitionedFileWriter implements AutoCloseable { throws IOException;

/**
* WritesPersists athe {@linkregion Buffer}index of the given subpartition to the opened {@link PartitionedFile}.
current data region and starts a new region to write.
*
* <p>Note: The caller is responsible for recycling the target buffer and releasing the failed
* {
@link PartitionedFile} if any
* exception occurs.
*/
public void writeBuffer(Buffer target, int subpartitionIndex * @param isBroadcastRegion Whether it's a broadcast region. See {@link #isBroadcastRegion}.
*/
public void startNewRegion(boolean isBroadcastRegion) throws IOException;

/**
* FinishesWrites thea current list of {@link Buffer}s to this {@link PartitionedFile} which closes the file channel and constructs
* the corresponding {@link PartitionedFile.PartitionedFileIndex}.
*
* <p>Note: The caller is responsible for releasing the failed {@link PartitionedFile} if any
* exception occurs.
*/
public PartitionedFile finish(. It guarantees that after
* the return of this method, the target buffers can be released. In a data region, all data of
* the same subpartition must be written together.
*
* <p>Note: The caller is responsible for recycling the target buffers and releasing the failed
* {@link PartitionedFile} if any exception occurs.
*/
public void writeBuffers(List<BufferWithChannel> bufferWithChannels) throws IOException;

/**
* Finishes writing the {@link PartitionedFile} which closes the file channel and returns the
* corresponding {@link PartitionedFile}.
*
* <p>Note: The caller is responsible for releasing the failed {@link PartitionedFile} if any
* exception occurs.
*/
public PartitionedFile finish() throws IOException;

/** Used to close and delete the failed {@link PartitionedFile} when any exception occurs. */
public void releaseQuietly();

@Override
public void close() throws IOException;
}

PartitionedFileReader: Reader which can read all data of the target subpartition from a PartitionedFile.

class PartitionedFileReader {

/**
* Reads a buffer from the current region of the target {@link PartitionedFile} and moves the
* read position forward.
*
* <p>Note: The caller is responsible for recycling the target buffer if any exception occurs.
*
* @param target The target {@link MemorySegment} to read data to.
* @param recycler The {@link BufferRecycler} which is responsible to recycle the target buffer.
* @return A {@link Buffer} containing the data read.
*/
@Nullable
public Buffer readCurrentRegion(MemorySegment target, BufferRecycler recycler) throws IOException;

public boolean hasRemaining() throws IOException;

/** Gets read priority of this file reader. Smaller value indicates higher priority. */
public long getPriority();
}

SortMergeResultPartition: Entry point of sort-merge based blocking shuffle. (Override methods are inherited from ResultPartition)

public class SortMergeResultPartition extends ResultPartition {

@Override
public void setup() throws IOException;

@Override
protected void releaseInternal();

@Override
public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException;

/** @Override
*public Usedvoid to close and delete the failed {@link PartitionedFile} when any exception occurs.broadcastRecord(ByteBuffer record) throws IOException;

*/@Override
public void releaseQuietly();
}

PartitionedFileReader: Reader which can read all data of the target subpartition from a PartitionedFile.

public class PartitionedFileReader implements AutoCloseable {

broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException;

/**
* Opens Spills the large record into the giventarget {@link PartitionedFile} andas movesa readseparate positiondata toregion.
the starting offset of the*/
*private targetvoid subpartition.writeLargeRecord(
*/
public void open() throws IOException;
ByteBuffer record, /**
* Reads a buffer from the {@link PartitionedFile} and moves the read position forward.int targetSubpartition, DataType dataType, boolean isBroadcast)
throws IOException;

*@Override
*public <p>Note: The caller is responsible for recycling the target buffer if any exception occurs.void finish() throws IOException;

@Override
public void close();

*/@Override
@Nullablepublic ResultSubpartitionView createSubpartitionView(
public Buffer readBuffer(MemorySegment target, BufferRecycler recycler) throws IOException;
int publicsubpartitionIndex, booleanBufferAvailabilityListener hasRemaining(availabilityListener);
@Override
public void close() throws IOException;
}

SortMergeResultPartition: Entry point of sort-merge based blocking shuffle. (Override methods are inherited from ResultPartition)

public class SortMergeResultPartition extends ResultPartitionthrows {IOException;

@Override
protectedpublic void releaseInternalflushAll();

@Override
public void emitRecordflush(ByteBuffer record, int targetSubpartitionsubpartitionIndex) throws IOException;

@Override
public voidCompletableFuture<?> broadcastRecordgetAvailableFuture(ByteBuffer record) throws IOException;

@Override
public voidint broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException;
getNumberOfQueuedBuffers();

@Override
public int getNumberOfQueuedBuffers(int targetSubpartition);
}

SortMergePartitionReader: Subpartition data reader for SortMergeResultPartition. (Override methods are mainly inherited from ResultSubpartitionView)


public class SortMergeSubpartitionReader/**
* Spills the large recordimplements into a separateResultSubpartitionView, Comparable<SortMergeSubpartitionReader> {@link PartitionedFile}.

*/@Nullable
privatewriteLargeRecord( void @Override
public BufferAndBacklog getNextBuffer();

/** ByteBufferThis record,method intis targetSubpartition,called DataTypeby dataType)the throwsIO IOException;
thread of {@link void releaseReader(SortMergePartitionReader reader);
SortMergeResultPartitionReadScheduler}. */
@Override
public boolean readBuffers(Queue<MemorySegment> publicbuffers, voidBufferRecycler finish(recycler) throws IOException;

@Override
public voidCompletableFuture<?> closegetReleaseFuture();

@Override
public ResultSubpartitionViewvoid createSubpartitionViewfail(
int subpartitionIndex, BufferAvailabilityListener listener) throws IOException;
Throwable throwable);

@Override
public void flushAllnotifyDataAvailable();

@Override
public voidint flushcompareTo(intSortMergeSubpartitionReader subpartitionIndexthat);

@Override
public CompletableFuture<?>void getAvailableFuturereleaseAllResources();
}

SortMergePartitionReader: Subpartition data reader for link SortMergeResultPartition. (Override methods are inherited from ResultSubpartitionView and BufferRecycler)

...



@Nullable
@Override
public BufferAndBacklogboolean getNextBufferisReleased() throws IOException;

@Override
public void notifyDataAvailableresumeConsumption();

@Override
public voidThrowable recyclegetFailureCause(MemorySegment segment);

@Override
public voidboolean releaseAllResourcesisAvailable(int numCreditsAvailable);

@Override
public booleanint isAvailableunsynchronizedGetNumberOfQueuedBuffers(int numCreditsAvailable);
}

The interface of SortBuffer and PartitionedFileMerger is flexible enough and new requirements like sorting by record can be also implemented easily if needed.

...

Basic shuffle logic and data compression will be implemented first, which can make the sort-merge based blocking shuffle available for usage. Main components include 1) SortBuffer and a hash-based data clustering implementation; 2) PartitionedFile together with the corresponding writer (PartitionedFileWriter) and reader (PartitionedFileReader); 3) SortMergeResultPartition and the subpartition data reader SortMergePartitionReader. We will introduce this components separately. For data compression, by reusing the facilities implemented for the existing BoundedBlockingResultPartition, only very small change is needed. Tests will include both unit tests, IT cases and real job test on a cluster. Besides, IO scheduling can improve the shuffle performance a lot so we also need to implement it.

File merge IO scheduling and other optimizations can be implemented as the second step. Main components include MergePolicy, PartitionedFileMerge, IOScheduler and PartitionRequestManagerIO scheduler and buffer pool for batch shuffle. Tests will include both unit tests, IT cases and real job test on a cluster.

...