Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Overview

By rebalancing the Apache Ignite cluster, it balances the distribution of primary and backup data copies would be balanced according to applied affinity function on the new set of peer nodesImbalanced data increases the likelihood of data loss and can affect peer peers utilization during data requests. On the other hand, a balanced set of data copies optimizes each Ignite peer 's requests load and each Ignite peer 's disk resources consumption.

...

  • In-memory batch rebalancing;
  • Historical (WAL) rebalancing (the native presistence enabled);

...

Current limitations

Regardless of which rebalance mode is used SYNC or ASYNC (defined in CacheRebalanceMode enum), the Apache Ignite rebalance implementation has a number of limitations caused by a memory-centric desing architecture:

  • Although all cache data is sent between peer's peers in batches (GridDhtPartitionSupplyMessage used) it still processes entries one by one. Such approach have the an process has low impact with a pure in-memory use case Apache Ignite usage but it leads to additional fsync's fsyncs and logging WAL records with the Apache Ignite the native persistence enabled. 

    By default, setRebalanceThreadPoolSize is set to 1 and setRebalanceBatchSize to 512K which means that thousands of key-value pairs will be processed single-thread and individually. Such Such an approach impacts on: 
    • The extra unnecessary changes to keep a node keep node data structures up to date. Adding each entry record into CacheDataStore will traverse and modify each index tree N-times. It will allocate the space N-times within FreeList structure and will have to additionally store WAL page delta records with approximate complexity ~ O(N*log(N));
    • Batch with N-entries will produce N-records in WAL which might end up with N fsyncs (assume fsync WAL mode configuration enabled);
    • Increased the chance of huge JVM pauses. The more attendant serving objects we produces produce by applying changes, the more often GC happens and the greater chance of JVM pauses we havearise;

  • Rebalancing The rebalancing procedure doesn't utilize the network and storage device throughput throughout to its full extent even with enough meaningful values of setRebalanceThreadPoolSize. For instance, trying to use a common recommendation of N+1 threads ( – the number of CPU cores available) to increase rebalance speed will drammatically slowdown computation performance on demander node. This can be easily shown on the graphs below.

    CPU utilization (supplier, demaner)

    Image Modified

    Image Modified

    setRebalanceThreadPoolSize
    9setRebalanceBatchSize 512K
    – 9; setRebalanceBatchSize – 512K;setRebalanceThreadPoolSize
    1
    ; setRebalanceBatchSize
     
    512K;
  • Design

  • Objective


Rebalance optimizations

Sending cache partitions

One and the most common case to which the Apache Ignite needs to support peer-2-peer partition file balancing can by apply – is adding a completely new node or the set of new nodes to the cluster. Such a scenario implies fully relocation of cache partition file transfer using zero-copy algorithm based on extension of communication SPI. 

  • Streaming via CommunicationSpi

  • Handshake message

The handshake message patched to support new type of connection.

Code Block
languagejava
themeConfluence
titleHandshakeMessage2.java
/** */
private static final byte PIPE_DATA_TRANSFER_MASK = 0x01;

/**
 * @return If socket will be used to transfer raw files.
 */
public boolean usePipeTransfer() {
    return (flags & PIPE_DATA_TRANSFER_MASK) != 0;
}

/**
 * @param usePipeTransfer {@code True} if socket should be used to transfer raw files.
 */
public final void usePipeTransfer(boolean usePipeTransfer) {
    flags = usePipeTransfer ? 
		(byte)(flags | PIPE_DATA_TRANSFER_MASK) : (byte)(flags & ~PIPE_DATA_TRANSFER_MASK);
}
  • Extension communication SPI

Communication SPI support new connection type to communicate with peers via sockets.

Code Block
languagejava
titleCommunicationSpi.java
    /**
     * @return {@code True} if new type of direct connections supported.
     */
    public default boolean pipeConnectionSupported() {
        return false;
    }

    /**
     * @param src Source cluster node to initiate connection with.
     * @return Channel to listen.
     * @throws IgniteSpiException If fails.
     */
    public default ReadableByteChannel getRemotePipe(ClusterNode src) throws IgniteSpiException {
        throw new UnsupportedOperationException();
    }

    /**
     * @param dest Destination cluster node to communicate with.
     * @param out Channel to write data.
     * @throws IgniteSpiException If fails.
     */
    public default void sendOnPipe(ClusterNode dest, WritableByteChannel out) throws IgniteSpiException {
        throw new UnsupportedOperationException();
    }

...

p2p-type connection support by GridNioServer

...

Rebalance checkpointing on supplier

...

Recovery from temporary WAL on demander

...

Questions

...

partition files of all caches (suppose RendezvousAffinityFunction used for all of them) to the new node. The partitition file transmitting over proposed low-level network socket communication signified the following fundamental things:

  • All data stored in the single partition file will be transmitted within single batch (equal to partition file) much faster and without the serealization\deserialization overhead. To roughly estimate the superiority of partition file transmitting using network sockets the native Linux scp\rsync commands can be used. The test environment showed us results – 270 MB/s over the current 40 MB/s single-threaded rebalance speed;
  • The zero-copy file transmission can be used [1]. The contents of a file can be transmitted without copying them through the user space. Internally, it depends on the underlying operating system's support for zero copy. For instance, in UNIX and various flavors of Linux, the Java method FileChannel.transfertTo() call is routed to the sendfile() system call;

Cache batch insert/update


DataStreamer approach

The most suitable case of using DataStreamer for the cluster rebalancing is the case when the user sets the rebalanceThreadPoolSize greater than one thread. The DataStreamer is designed to reduce thread contention on batched data insert operations, but it may lead to higher system resources degradation as well. In some of the cases it can be acceptable (e.g. user decides to donate some system resources to the rebalance procedure and reduce the total time of cluster balancing). One of the advantages of this approach is that it will allow users to choose between system stress level and rebalance throughput easily.

There are two options which can be implemented:

  • Initial rebalance procedure (there is no data in cache partitions) – the StreamReceiver ISOLATED_UPDATER can be used
  • Preloading to the non-empty partition – the StreamReceiver INDIVIDUAL can be used

Profiling current process

Environment

Cluster node

CPU(s): 56
CPU Model name: Intel(R) Xeon(R) CPU E5-2680 v4 @ 2.40GHz
SSD: HPE 960GB SATA 6G (Reads 530 MiB/s , Writes 490 MiB/s)
HDD: HP 9200 20TB SAS
Network I\O10Gb/s full duplex bandwidth

Cluster configuration

The Apache Ignite cluster configured to be consisting of two nodes, replicated cache configured with 1024 partitions and filled with dummy entries total size of 78 Gb.
The example configuration of profiling rebalance procedure with persistence enabled cache can be found – example-rebalance.xml

Profiling java code

Persistence enabled

SSD


Code Block
batches            : 146938   
rows               : 77321844 
rows per batch     : 526      

time (total)       : 20 min   
cache size         : 78055 MB 
rebalacne speed    : 63 MB\sec
rows per sec       : 62965 rows
batch per sec      : 119 batches


+ cache rebalance total                          : 1228051 ms : 100.00
+ + preload on demander                          : 1175260 ms : 95.70 
+ + + offheap().invoke(..)                       : 936040 ms  : 76.22 
+ + + + dataTree.invoke(..)                      : 895937 ms  : 72.96 
+ + + + + BPlusTree.invokeDown(..)               : 147120 ms  : 11.98 <<-!
+ + + + + FreeList.insertDataRow(..)             : 707925 ms  : 57.65 <<-!
+ + + + CacheDataStoreImpl.finishUpdate(..)      : 9154 ms    : 0.75  
+ + + ttl().addTrackedEntry(..)                  : 8149 ms    : 0.66  
+ + + wal().log(..)                              : 138571 ms  : 11.28 
+ + + continuousQueries().onEntryUpdated(..)     : 7287 ms    : 0.59  
+ message serialization                          : 1617 ms    : 0.13  
+ network delay between meesages (total)         : 15129 ms   : 1.23  
+ make batch on supplier handleDemandMessage(..) : 240618 ms  : 19.85 

HDD


Code Block
batches            : 146938   
rows               : 77321844 
rows per batch     : 526      

time (total)       : 40 min   
cache size         : 78055 MB 
rebalacne speed    : 31 MB\sec
rows per sec       : 31470 rows

+ cache rebalance total                          : 2456973 ms : 100.00
+ + preload on demander                          : 2415154 ms : 98.30 
+ + + offheap().invoke(..)                       : 1640175 ms : 66.76 
+ + + + dataTree.invoke(..)                      : 1595260 ms : 64.93 
+ + + + + BPlusTree.invokeDown(..)               : 220390 ms  : 8.97  
+ + + + + FreeList.insertDataRow(..)             : 1340636 ms : 54.56 <<-!
+ + + + CacheDataStoreImpl.finishUpdate(..)      : 10807 ms   : 0.44  
+ + + ttl().addTrackedEntry(..)                  : 9678 ms    : 0.39  
+ + + wal().log(..)                              : 664680 ms  : 27.05 <<-!
+ + + continuousQueries().onEntryUpdated(..)     : 8521 ms    : 0.35  
+ message serialization                          : 1618 ms    : 0.07  
+ network delay between nodes                    : 7788 ms    : 0.32  
+ make batch on supplier handleDemandMessage(..) : 185749 ms  : 7.59  


In-memory only


Code Block
batches            : 150701    
rows               : 79355844  
rows per batch     : 526       

time (total)       : 5.5 min   
cache size         : 79852 MB  
rebalacne speed    : 234 MB\sec
rows per sec       : 232715 rows

+ cache rebalance total                          : 341524 ms : 100.00
+ + preload on demander                          : 306950 ms : 89.88 
+ + + offheap().invoke(..)                       : 228015 ms : 66.76 
+ + + + dataTree.invoke(..)                      : 195239 ms : 57.17 
+ + + + + BPlusTree.invokeDown(..)               : 71207 ms  : 20.85 <<-!
+ + + + + FreeList.insertDataRow(..)             : 121611 ms : 35.61 <<-!
+ + + + CacheDataStoreImpl.finishUpdate(..)      : 9988 ms   : 2.92  
+ + + ttl().addTrackedEntry(..)                  : 10523 ms  : 3.08  
+ + + continuousQueries().onEntryUpdated(..)     : 9665 ms   : 2.83  
+ message serialization                          : 1307 ms   : 0.38  
+ network delay between nodes                    : 23409 ms  : 6.85  
+ make batch on supplier handleDemandMessage(..) : 90102 ms  : 26.39 

Resources utilization

REPLICATED Cache size (total): 83839 MB
Rebalance time (total): 1421295 ms
Rebalance speed (avg): 59 MB/s


CPU utilization

Image Added


CPU user time

Image Added


CPU io wait time

Image Added


SSD utilization

Image Added


Network utilization

Image Added

...


References

  1. Zero Copy I: User-Mode Perspective – https://www.linuxjournal.com/article/6345
  2. Example: Efficient data transfer through zero copy – https://www.ibm.com/developerworks/library/j-zerocopy/index.html
  3. Persistent Store Overview#6.PartitionRecovery