Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Although all cache data is sent between peer's in batches (GridDhtPartitionSupplyMessage used) it still processes entries one by one. Such approach have the low impact with a pure in-memory use case but it leads to additional fsync's and logging WAL records with the Apache Ignite native persistence enabled. 

    By default, setRebalanceThreadPoolSize is set to 1 and setRebalanceBatchSize to 512K which means that thousands of key-value pairs will be processed single-thread and individually. Such approach impacts on: 
    • The extra unnecessary changes to keep a node data structures up to date. Adding each entry record into CacheDataStore will traverse and modify each index tree N-times. It will allocate the space N-times within FreeList structure and have to additionally store WAL page delta records with approximate complexity ~ O(N*log(N));
    • Batch with N-entries will produce N-records in WAL which might end up with N fsyncs (assume fsync WAL mode configuration enabled);
    • Increased the chance of huge JVM pauses. The more attendant objects we produces by applying changes, the more GC happens and the greater chance of JVM pauses we have;

  • Rebalancing procedure doesn't utilize the network and storage device throughput to full extent even with enough meaningful values of setRebalanceThreadPoolSize. For instance, trying to use a common recommendation of N+1 threads ( – the number of CPU cores available) to increase rebalance speed will drammatically slowdown computation performance on demander node. This can be easily shown on the graphs below.

    CPU utilization (supplier, demaner)

    Image Modified

    Image Modified


    setRebalanceThreadPoolSize
    =
    9
    setRebalanceBatchSize 
    =
    512K





    setRebalanceThreadPoolSize
    setRebalanceThreadPoolSize = 1 setRebalanceBatchSize =
    1
    setRebalanceBatchSize 512K







  • Design

  • Objective

Apache Ignite needs to support peer-2-peer cache partition file transfer using zero-copy algorithm based on extension of communication SPI. 

  • Streaming via CommunicationSpi

  • Handshake message

The handshake message patched to support new type of connection.

Code Block
languagejava
themeConfluence
titleHandshakeMessage2.java
/** */
private static final byte PIPE_DATA_TRANSFER_MASK = 0x01;

/**
 * @return If socket will be used to transfer raw files.
 */
public boolean usePipeTransfer() {
    return (flags & PIPE_DATA_TRANSFER_MASK) != 0;
}

/**
 * @param usePipeTransfer {@code True} if socket should be used to transfer raw files.
 */
public final void usePipeTransfer(boolean usePipeTransfer) {
    flags = usePipeTransfer ? 
		(byte)(flags | PIPE_DATA_TRANSFER_MASK) : (byte)(flags & ~PIPE_DATA_TRANSFER_MASK);
}


  • Extension communication SPI

Communication SPI support new connection type to communicate with peers via sockets.

Code Block
languagejava
titleCommunicationSpi.java
    /**
     * @return {@code True} if new type of direct connections supported.
     */
    public default boolean pipeConnectionSupported() {
        return false;
    }

    /**
     * @param src Source cluster node to initiate connection with.
     * @return Channel to listen.
     * @throws IgniteSpiException If fails.
     */
    public default ReadableByteChannel getRemotePipe(ClusterNode src) throws IgniteSpiException {
        throw new UnsupportedOperationException();
    }

    /**
     * @param dest Destination cluster node to communicate with.
     * @param out Channel to write data.
     * @throws IgniteSpiException If fails.
     */
    public default void sendOnPipe(ClusterNode dest, WritableByteChannel out) throws IgniteSpiException {
        throw new UnsupportedOperationException();
    }

  • TCP connection listener for TcpCommunicationSpi

  • p2p-type connection support by GridNioServer

  • Rebalance checkpointing on supplier

  • Recovery from temporary WAL on demander

  • Questions

  • How many streaming connetions will be supported by CommunicationSpi?
    Single connection between pair of nodes.
  • Will bandwidth of CommunicationSpi connection be controlled at runtime?
    Yes.
  • Create Standalone
  • How to choose the best rebalance type on the new node joined topology?
  • ASYNC and SYNC cache rebalacing via p2p?