By rebalancing the Apache Ignite cluster, the distribution of primary and backup data copies would be balanced according to applied affinity function on the new set of peer nodes. Imbalanced data increases the likelihood of data loss and can affect peers utilization during data requests. On the other hand, a balanced set of data copies optimizes each peer requests load and each peer disk resources consumption.
Currently, there are two types of the Apache Ignite cluster rebalancing:
Regardless of which rebalance mode is used SYNC
or ASYNC
(defined in CacheRebalanceMode
enum), the Apache Ignite rebalance implementation has a number of limitations caused by a memory-centric desing architecture:
GridDhtPartitionSupplyMessage
used) it still processes entries one by one. Such an process has low impact with a pure in-memory Apache Ignite usage but it leads to additional fsyncs and logging WAL records with the native persistence enabled. setRebalanceThreadPoolSize
is set to 1
and setRebalanceBatchSize
to 512K
which means that thousands of key-value pairs will be processed single-thread and individually. Such an approach impacts on: CacheDataStore
will traverse and modify each index tree N-times. It will allocate the space N-times within FreeList
structure and will have to additionally store WAL page delta records with approximate complexity ~ O(N*log(N))
;The rebalancing procedure doesn't utilize the network and storage device throughout to its full extent even with enough meaningful values of setRebalanceThreadPoolSize
. For instance, trying to use a common recommendation of N+1
threads (N
– the number of CPU cores available) to increase rebalance speed will drammatically slowdown computation performance on demander node. This can be easily shown on the graphs below.
CPU utilization (supplier, demaner) | |
---|---|
setRebalanceThreadPoolSize – 9;
setRebalanceBatchSize – 512K; | setRebalanceThreadPoolSize – 1;
setRebalanceBatchSize – 512K; |
One and the most common case to which the peer-2-peer partition file balancing can by apply – is adding a completely new node or the set of new nodes to the cluster. Such a scenario implies fully relocation of cache partition files of all caches (suppose RendezvousAffinityFunction
used for all of them) to the new node. The partitition file transmitting over proposed low-level network socket communication signified the following fundamental things:
scp\rsync
commands can be used. The test environment showed us results – 270 MB/s
over the current 40 MB/s
single-threaded rebalance speed;FileChannel.transfertTo()
call is routed to the sendfile()
system call;Apache Ignite needs to support peer-2-peer cache partition file transfer using zero-copy algorithm based on extension of communication SPI.
The handshake message patched to support new type of connection.
/** */ private static final byte PIPE_DATA_TRANSFER_MASK = 0x01; /** * @return If socket will be used to transfer raw files. */ public boolean usePipeTransfer() { return (flags & PIPE_DATA_TRANSFER_MASK) != 0; } /** * @param usePipeTransfer {@code True} if socket should be used to transfer raw files. */ public final void usePipeTransfer(boolean usePipeTransfer) { flags = usePipeTransfer ? (byte)(flags | PIPE_DATA_TRANSFER_MASK) : (byte)(flags & ~PIPE_DATA_TRANSFER_MASK); }
Communication SPI support new connection type to communicate with peers via sockets.
/** * @return {@code True} if new type of direct connections supported. */ public default boolean pipeConnectionSupported() { return false; } /** * @param src Source cluster node to initiate connection with. * @return Channel to listen. * @throws IgniteSpiException If fails. */ public default ReadableByteChannel getRemotePipe(ClusterNode src) throws IgniteSpiException { throw new UnsupportedOperationException(); } /** * @param dest Destination cluster node to communicate with. * @param out Channel to write data. * @throws IgniteSpiException If fails. */ public default void sendOnPipe(ClusterNode dest, WritableByteChannel out) throws IgniteSpiException { throw new UnsupportedOperationException(); }