Introduction

By default, the logs are sent from the leader to followers by the thread that created them, i.e., the client thread. This manner increases the utility of the network by avoiding the necessity to wait for the response from followers before sending another log. However, the downside is that the order of log arrival is no longer guaranteed and followers may have to wait for previous logs before processing the current one since the Raft Algorithm requires that the logs be appended strictly ordered. For example, assume there are 5 threads on the leader side, generating log1, log2, ..., log5 separately, and then they send their logs to the followers concurrently. As the logs are sent concurrently, they may reach a follower in an arbitrary order because the underlying network layer schedule cannot be controlled. Ideally, the receiving order should be the same as their log index, i.e., log1, log2, ..., log5, so the follower can directly append all of the logs one by one without any waiting. However, in the worst case, the logs may arrive at a totally reversed order, aka log5, log4, ..., log1. Because the previous logs are missing, other logs must hold until log1 arrives before they can be processed.  Fig.1 demonstrates the timelines of the ideal receiving order and the worst order. It can be clearly seen that it takes much longer to finish processing the three logs when they are in a reversed order and the firstly arrived log3 must wait for a long time before it can be processed. CPU is idle during the waits, lowering the resource utilities of the system and harnessing the overall performance.

Besides the potentially long idle time caused by the disorder of logs, frequent thread contention is also induced. When a log arrives but it is not the proceeding one of the local last log, it will wait on a condition provided by RaftLogManager. Whenever a new log is appended to RaftLogManager, the waiting log will be woken up and check again if its previous log has arrived. The problem is that all waiting logs wait on the same condition, so when a new log is appended, all of them will be woken up, but only one of them can proceed and the others will wait again. This changes the status of many threads, while only one of the changes is necessary, and when the number of waiting threads is more than the number of available cores, some threads must compete for a core to perform the check of whether to proceed or wait for more. During the contention, the right thread (the one that holds the proceeding log) may fail several times before it is switched to a core, delaying the appending of the next log, which further leaves other threads longer in the waiting status. 

We have developed a structure called LogDispatcher to keep the logs ordered when a follower receives them, but it cannot fully utilize the network bandwidth. LogDispatcher will be introduced in another document and this one focuses on reducing the thread contention when not using LogDispatcher.


Figure.1 Ideal receiving order vs worst receiving order

Method

The method is quite intuitive: if we wake up only the wanted thread, the other threads will remain to wait and will not compete with the wanted thread. However, Java does not provide a handy API to wake up a specified thread, and all we can use are waking up a random thread using notify(), or waking up all threads using notifyAll(). Neither of the two methods is feasible for our situation, so we must work another way around. Although we cannot choose which thread to wake up, we can choose which thread will wait on which condition, if there are multiple conditions. We break the unique condition into k conditions, and the log whose index is I will wait on the (i - 1) % k condition, and when it is appended, it will wake up all threads waiting on the i % k condition (notifyAll()). If k is reasonably large, each thread shall wait on a separate condition, so when we notify a condition, only one thread will be woken up and there will be no contention.  Fig.2 presents the benefit of using multiple conditions over a single one when there are three threads, one CPU core, and two conditions. When there is only one condition after log1 is appended, thread3 may be switched to the core, but it can not proceed because log2 is still missing in RaftLogManager, and it returns to the waiting status. Using multiple conditions avoids such a waste as only thread2 is notified after appending log1, which can proceed without further waiting.

Figure.2 Multiple conditions reduce thread contention

Evaluation

We use a simulation to evaluate the effect of using multiple conditions. We create producer threads simulating k clients, each will create empty logs with a shared AtomicLong to generate increasing unique log indices, then append the log to a shared RaftMember, and sleep for t ms after each append, to simulate network transport latency and client preparing data. The number of conditions is n. We take the throughput (how many new logs can be appended in a second) as the measurement, and the reported result is the average of a run of 30 seconds, which should be enough as only CPU operations are involved. 

The experiments are run on a machine with i7-8700(4C8T), JDK 1.8.10, Windows10, and IoTDB commitID is e7b1421a544cba0ac5f42f4d424d01a997619464. 

We choose k = [1,2,4,10,50,100,200,500], n = [1,10,100,1000], and t = [0, 1, 5] in our evaluation. The results of different t are shown in Table.1, and some results are omitted because there is no point to increase n if it is already no less than k since each thread can wait on a separate condition, and adding more conditions does not provide more benefit.

Table.1 Experimental results

sleep time t = 0ms
ProducerNum kthroughput with N conditions (ops)
N=1N=10N=100N=1000
11097915///
2165629169126//
495561158491//
1032276158331//
50687953628146976/
100353222866146835/
20018081162679125127831
5008611496556016340
sleep time t = 1ms
ProducerNum kthroughput with N conditions (ops)
N=1N=10N=100N=1000
1592///
211581141//
421982179//
1050015229//
50166852511924826/
10053704241147994/
2001834139089381798849
5008501478551514320
sleep time t = 5ms
ProducerNum kthroughput with N conditions (ops)
N=1N=10N=100N=1000
1180///
2360360//
4717714//
1017541729//
50701482068345/
100133391565316298/
2001883323063236431319
50088415851201674208

The highest throughput is reached when there are only one producer and no sleep after each append. This is because the log-append itself is very simple, and when there is only one thread, lock competition for the RaftLogManager is totally avoided. While if there is more than one thread, some threads must wait then be woken up, which is much heavier than log-append and causes a performance drop. When there is no sleep time and only one condition, throughput drops almost linearly as k increases, because as all producers wait on the same condition, each notify will wake up all of them, and the contention is proportion to the number of total producers.  After using multiple conditions, throughput is significantly improved, and when k < N, performances are similar since thread contention is entirely removed. Performance still drops as k increases because the cost of switching threads becomes critical, yet it is still beneficial to increase the number of conditions, which avoids thread contention.

When the append operation becomes more costly simulated by increasing sleep time t, one thread is no longer sufficient to provide maximum throughput, which suits general situations better where sending large logs is time-consuming. 50 producers are needed to reach a performance peak when there is only one condition. When using more conditions, peak performance and the k necessary to maximize the performance also increases, because CPU time is saved from the thread contention, thus more threads are required to fully utilize the saved CPU time. But when there are too many threads (500), thread switch cost becomes dominant, and the performance is very pool. Interestingly, compared with no sleep time when k = 200 and N = 100, sleeping for 1ms even increases the throughput (from 79125 to 93817), which is probably because proper sleep time keeps the number of running threads low and reduces the cost of switch threads, but the interval between two adjacent requests is uncontrollable during production, we will not discuss it further here.

Results are similar if sleep time is longer, and as the CPU becomes idle more often, we need more threads to keep it busy so the peak performance is reached at k = 100 when N = 1. Using more conditions also helps to improve performance.  One interesting thing is that when k = 200 and t = 5, 10 conditions are enough to reach the maximum performance. It should be noticed that having k producers does not mean there are always k conditions occupied, because some producers may be sleeping and do not wait on any conditions. In this configuration, the CPU scheduling may coincidently cause that no more than 10 threads are waiting on conditions, thus 10 conditions are just enough. As CPU scheduling is not controllable from the view of an application, using enough conditions (N > k) will have a better performance guarantee.

Conclusion

Splitting the condition for previous log waiting can significantly reduce the number of threads being woken up when logs are updated, decrease the consequent thread contention when there are no sufficient idle CPU cores, or the cost of switching threads that cannot proceed into CPU cores. Therefore, CPU time can be saved from unnecessary thread status changing and switching, and other operations can be processed timely so the overall performance is also improved. This improvement also has a very low cost, as it only requires to create an array of conditions (which are just objects of Java Object class) and calculate the corresponding condition before wait and notify (mod the log index with the size of the array).

In general, using more than 1000 clients will not provide further benefits for most servers,  as the core number of them is usually no more than a hundred or two. So we fix to use 1024 conditions for each RaftLogManager. It may be configurable in the future, but we believe the default value is enough for most situations.

  • No labels

1 Comment

  1. Removal of thread correlation is great technique to improve concurrency.

    In this situation, instead of multiple conditions, typically it is a consuming thread pool waiting for a queue of task (I like C/C++).

    IoTDB does have really high thread count, which causes thread context switching competition and hence the lower throughput on intensive memory operations.

    By the way, there is almost no need for sleep ever.