Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • All data stored in the single partition file will be transmitted within single batch (equal to partition file) much faster and without the serealization\deserialization overhead. To roughly estimate the superiority of partition file transmitting using network sockets the native Linux scp\rsync commands can be used. The test environment showed us results – 270 MB/s over the current 40 MB/s single-threaded rebalance speed;
  • The zero-copy file transmission can be used [1]. The contents of a file can be transmitted without copying them through the user space. Internally, it depends on the underlying operating system's support for zero copy. For instance, in UNIX and various flavors of Linux, the Java method FileChannel.transfertTo() call is routed to the sendfile() system call;

Objective

Apache Ignite needs to support peer-2-peer cache partition file transfer using zero-copy algorithm based on extension of communication SPI. 

Rebalance process overview

Streaming via CommunicationSpi

Handshake message

The handshake message patched to support new type of connection.

Code Block
languagejava
themeConfluence
titleHandshakeMessage2.java
collapsetrue
/** */
private static final byte PIPE_DATA_TRANSFER_MASK = 0x01;

/**
 * @return If socket will be used to transfer raw files.
 */
public boolean usePipeTransfer() {
    return (flags & PIPE_DATA_TRANSFER_MASK) != 0;
}

/**
 * @param usePipeTransfer {@code True} if socket should be used to transfer raw files.
 */
public final void usePipeTransfer(boolean usePipeTransfer) {
    flags = usePipeTransfer ? 
		(byte)(flags | PIPE_DATA_TRANSFER_MASK) : (byte)(flags & ~PIPE_DATA_TRANSFER_MASK);
}

Public API

Communication SPI 

Communication SPI support new connection type to communicate with peers via sockets.

Code Block
languagejava
titleCommunicationSpi.java
collapsetrue
/**
 * @return {@code True} if new type of direct connections supported.
 */
public default boolean pipeConnectionSupported() {
    return false;
}
  
/**
 * @param src Source cluster node to initiate connection with.
 * @return Channel to listen.
 * @throws IgniteSpiException If fails.
 */
public default ReadableByteChannel getRemotePipe(ClusterNode src) throws IgniteSpiException {
    throw new UnsupportedOperationException();
}
  
/**
 * @param dest Destination cluster node to communicate with.
 * @param out Channel to write data.
 * @throws IgniteSpiException If fails.
 */
public default void sendOnPipe(ClusterNode dest, WritableByteChannel out) throws IgniteSpiException {
    throw new UnsupportedOperationException();
}

Internal API

Tcp connection listener

Rebalance checkpointing on supplier

Recovery from temporary WAL on demander

Questions

References

  1. Zero Copy I: User-Mode Perspective – https://www.linuxjournal.com/article/6345
  2. Example: Efficient data transfer through zero copy – https://www.ibm.com/developerworks/library/j-zerocopy/index.html
  3. Persistent Store Overview#6.PartitionRecovery