Overview

By rebalancing the Apache Ignite cluster, the distribution of primary and backup data copies would be balanced according to applied affinity function on the new set of peer nodesImbalanced data increases the likelihood of data loss and can affect peers utilization during data requests. On the other hand, a balanced set of data copies optimizes each peer requests load and each peer disk resources consumption.

Currently, there are two types of the Apache Ignite cluster rebalancing:

  • In-memory batch rebalancing;
  • Historical (WAL) rebalancing (the native presistence enabled);

Current limitations

Regardless of which rebalance mode is used SYNC or ASYNC (defined in CacheRebalanceMode enum), the Apache Ignite rebalance implementation has a number of limitations caused by a memory-centric desing architecture:

  • Although all cache data is sent between peers in batches (GridDhtPartitionSupplyMessage used) it still processes entries one by one. Such an process has low impact with a pure in-memory Apache Ignite usage but it leads to additional fsyncs and logging WAL records with the native persistence enabled. 

    By default, setRebalanceThreadPoolSize is set to 1 and setRebalanceBatchSize to 512K which means that thousands of key-value pairs will be processed single-thread and individually. Such an approach impacts on: 
    • The extra unnecessary changes to keep node data structures up to date. Adding each entry record into CacheDataStore will traverse and modify each index tree N-times. It will allocate the space N-times within FreeList structure and will have to additionally store WAL page delta records with approximate complexity ~ O(N*log(N));
    • Batch with N-entries will produce N-records in WAL which might end up with N fsyncs (assume fsync WAL mode configuration enabled);
    • Increased the chance of huge JVM pauses. The more serving objects we produce by applying changes, the more often GC happens and the greater chance of JVM pauses arise;

  • The rebalancing procedure doesn't utilize the network and storage device throughout to its full extent even with enough meaningful values of setRebalanceThreadPoolSize. For instance, trying to use a common recommendation of N+1 threads ( – the number of CPU cores available) to increase rebalance speed will drammatically slowdown computation performance on demander node. This can be easily shown on the graphs below.

    CPU utilization (supplier, demaner)

    setRebalanceThreadPoolSize – 9; setRebalanceBatchSize – 512K;setRebalanceThreadPoolSize – 1; setRebalanceBatchSize – 512K;

Rebalance optimizations

Sending cache partitions

One and the most common case to which the peer-2-peer partition file balancing can by apply – is adding a completely new node or the set of new nodes to the cluster. Such a scenario implies fully relocation of cache partition files of all caches (suppose RendezvousAffinityFunction used for all of them) to the new node. The partitition file transmitting over proposed low-level network socket communication signified the following fundamental things:

  • All data stored in the single partition file will be transmitted within single batch (equal to partition file) much faster and without the serealization\deserialization overhead. To roughly estimate the superiority of partition file transmitting using network sockets the native Linux scp\rsync commands can be used. The test environment showed us results – 270 MB/s over the current 40 MB/s single-threaded rebalance speed;
  • The zero-copy file transmission can be used [1]. The contents of a file can be transmitted without copying them through the user space. Internally, it depends on the underlying operating system's support for zero copy. For instance, in UNIX and various flavors of Linux, the Java method FileChannel.transfertTo() call is routed to the sendfile() system call;

Cache batch insert/update


DataStreamer approach

The most suitable case of using DataStreamer for the cluster rebalancing is the case when the user sets the rebalanceThreadPoolSize greater than one thread. The DataStreamer is designed to reduce thread contention on batched data insert operations, but it may lead to higher system resources degradation as well. In some of the cases it can be acceptable (e.g. user decides to donate some system resources to the rebalance procedure and reduce the total time of cluster balancing). One of the advantages of this approach is that it will allow users to choose between system stress level and rebalance throughput easily.

There are two options which can be implemented:

  • Initial rebalance procedure (there is no data in cache partitions) – the StreamReceiver ISOLATED_UPDATER can be used
  • Preloading to the non-empty partition – the StreamReceiver INDIVIDUAL can be used

Profiling current process

Environment

Cluster node

CPU(s): 56
CPU Model name: Intel(R) Xeon(R) CPU E5-2680 v4 @ 2.40GHz
SSD: HPE 960GB SATA 6G (Reads 530 MiB/s , Writes 490 MiB/s)
HDD: HP 9200 20TB SAS
Network I\O10Gb/s full duplex bandwidth

Cluster configuration

The Apache Ignite cluster configured to be consisting of two nodes, replicated cache configured with 1024 partitions and filled with dummy entries total size of 78 Gb.
The example configuration of profiling rebalance procedure with persistence enabled cache can be found – example-rebalance.xml

Profiling java code

Persistence enabled

SSD


batches            : 146938   
rows               : 77321844 
rows per batch     : 526      

time (total)       : 20 min   
cache size         : 78055 MB 
rebalacne speed    : 63 MB\sec
rows per sec       : 62965 rows
batch per sec      : 119 batches


+ cache rebalance total                          : 1228051 ms : 100.00
+ + preload on demander                          : 1175260 ms : 95.70 
+ + + offheap().invoke(..)                       : 936040 ms  : 76.22 
+ + + + dataTree.invoke(..)                      : 895937 ms  : 72.96 
+ + + + + BPlusTree.invokeDown(..)               : 147120 ms  : 11.98 <<-!
+ + + + + FreeList.insertDataRow(..)             : 707925 ms  : 57.65 <<-!
+ + + + CacheDataStoreImpl.finishUpdate(..)      : 9154 ms    : 0.75  
+ + + ttl().addTrackedEntry(..)                  : 8149 ms    : 0.66  
+ + + wal().log(..)                              : 138571 ms  : 11.28 
+ + + continuousQueries().onEntryUpdated(..)     : 7287 ms    : 0.59  
+ message serialization                          : 1617 ms    : 0.13  
+ network delay between meesages (total)         : 15129 ms   : 1.23  
+ make batch on supplier handleDemandMessage(..) : 240618 ms  : 19.85 

HDD


batches            : 146938   
rows               : 77321844 
rows per batch     : 526      

time (total)       : 40 min   
cache size         : 78055 MB 
rebalacne speed    : 31 MB\sec
rows per sec       : 31470 rows

+ cache rebalance total                          : 2456973 ms : 100.00
+ + preload on demander                          : 2415154 ms : 98.30 
+ + + offheap().invoke(..)                       : 1640175 ms : 66.76 
+ + + + dataTree.invoke(..)                      : 1595260 ms : 64.93 
+ + + + + BPlusTree.invokeDown(..)               : 220390 ms  : 8.97  
+ + + + + FreeList.insertDataRow(..)             : 1340636 ms : 54.56 <<-!
+ + + + CacheDataStoreImpl.finishUpdate(..)      : 10807 ms   : 0.44  
+ + + ttl().addTrackedEntry(..)                  : 9678 ms    : 0.39  
+ + + wal().log(..)                              : 664680 ms  : 27.05 <<-!
+ + + continuousQueries().onEntryUpdated(..)     : 8521 ms    : 0.35  
+ message serialization                          : 1618 ms    : 0.07  
+ network delay between nodes                    : 7788 ms    : 0.32  
+ make batch on supplier handleDemandMessage(..) : 185749 ms  : 7.59  


In-memory only


batches            : 150701    
rows               : 79355844  
rows per batch     : 526       

time (total)       : 5.5 min   
cache size         : 79852 MB  
rebalacne speed    : 234 MB\sec
rows per sec       : 232715 rows

+ cache rebalance total                          : 341524 ms : 100.00
+ + preload on demander                          : 306950 ms : 89.88 
+ + + offheap().invoke(..)                       : 228015 ms : 66.76 
+ + + + dataTree.invoke(..)                      : 195239 ms : 57.17 
+ + + + + BPlusTree.invokeDown(..)               : 71207 ms  : 20.85 <<-!
+ + + + + FreeList.insertDataRow(..)             : 121611 ms : 35.61 <<-!
+ + + + CacheDataStoreImpl.finishUpdate(..)      : 9988 ms   : 2.92  
+ + + ttl().addTrackedEntry(..)                  : 10523 ms  : 3.08  
+ + + continuousQueries().onEntryUpdated(..)     : 9665 ms   : 2.83  
+ message serialization                          : 1307 ms   : 0.38  
+ network delay between nodes                    : 23409 ms  : 6.85  
+ make batch on supplier handleDemandMessage(..) : 90102 ms  : 26.39 

Resources utilization

REPLICATED Cache size (total): 83839 MB
Rebalance time (total): 1421295 ms
Rebalance speed (avg): 59 MB/s


CPU utilization

CPU user time

CPU io wait time

SSD utilization

Network utilization


References

  1. Zero Copy I: User-Mode Perspective – https://www.linuxjournal.com/article/6345
  2. Example: Efficient data transfer through zero copy – https://www.ibm.com/developerworks/library/j-zerocopy/index.html
  3. Persistent Store Overview#6.PartitionRecovery
  • No labels