Introduction

Kafka implements several request types that cannot immediately be answered with a response. Examples:

  • A produce request with acks=all cannot be considered complete until all replicas have acknowledged the write and we can guarantee it will not be lost if the leader fails.
  • A fetch request with min.bytes=1 won't be answered until there is at least one new byte of data for the consumer to consume. This allows a "long poll" so that the consumer need not busy wait checking for new data to arrive.

These requests are considered complete when either (a) the criteria they requested is complete or (b) some timeout occurs.

We intend to expand the use of this delayed request facility for various additional purposes including partition assignment and potentially quota enforcement.

The number of these asynchronous operations in flight at any time scales with the number of connections, which for Kafka is often tens of thousands.

A naive implementation of these would simply block the thread on the criteria, but this would not scale to the high number of in flight requests Kafka has.

The current approach uses a data structure called the "request purgatory". The purgatory holds any request that hasn't yet met its criteria to succeed but also hasn't yet resulted in an error. This structure holds onto these uncompleted requests and allows non-blocking event-based generation of the responses. This approach is obviously better than having a thread per in-flight request but our implementation of the data structure that accomplishes this has a number of deficiencies. The goal of this proposal is to improve the efficiency of this data structure.

Current Design

The request purgatory consists of a timeout timer and a hash map of watcher lists for event driven processing. A request is put into a purgatory when it is not immediately satisfiable because of unmet conditions. A request in the purgatory is completed later when the conditions are met or is forced to be completed (timeout) when it passed beyond the time specified in the timeout parameter of the request. Currently (0.8.x) it uses Java DelayQueue to implement the timer.

When a request is completed, the request is not deleted from the timer or watcher lists immediately. Instead, completed requests are deleted as they were found during condition checking. When the deletion does not keep up, the server may exhaust JVM heap and cause OutOfMemoryError. To alleviate the situation, the reaper thread purges completed requests from the purgatory when the number of requests in the purgatory (including both pending or completed requests) exceeds the configured number. The purge operation scans the timer queue and all watcher lists to find completed requests and deletes them.

By setting this configuration parameter low, the server can virtually avoid the memory problem. However, the server must pay a significant performance penalty if it scans all lists too frequently.

New Design

The goal of the new design is to allow immediate deletion of a completed request and reduce the load of expensive purge process significantly. It requires cross referencing of entries in the timer and the requests. Also it is strongly desired to have O(1) insert/delete cost since insert/delete operation happens for each request/completion.

To satisfy these requirements, we propose a new purgatory implementation based on Hierarchical Timing Wheels.

Hierarchical Timing Wheel

A simple timing wheel is a circular list of buckets of timer tasks. Let u be the time unit. A timing wheel with size n has n buckets and can hold timer tasks in n * u time interval. Each bucket holds timer tasks that fall into the corresponding time range. At the beginning, the first bucket holds tasks for [0, u), the second bucket holds tasks for [u, 2u), …, the n-th bucket for [u * (n -1), u * n). Every interval of time unit u, the timer ticks and moved to the next bucket then expire all timer tasks in it. So, the timer never insert a task into the bucket for the current time since it is already expired. The timer immediately runs the expired task. The emptied bucket is then available for the next round, so if the current bucket is for the time t, it becomes the bucket for [t + u * n, t + (n + 1) * u) after a tick. A timing wheel has O(1) cost for insert/delete (start-timer/stop-timer) whereas priority queue based timers, such as java.util.concurrent.DelayQueue and java.util.Timer, have O(log n) insert/delete cost.

A major drawback of a simple timing wheel is that it assumes that a timer request is within the time interval of n * u from the current time. If a timer request is out of this interval, it is an overflow. A hierarchical timing wheel deals with such overflows. It is a hierarchically organized timing wheels. The lowest level has the finest time resolution. As moving up the hierarchy, time resolutions become coarser. If the resolution of a wheel at one level is u and the size is n, the resolution of the next level should be n * u.  At each level overflows are delegated to the wheel in one level higher. When the wheel in the higher level ticks, it reinsert timer tasks to the lower level. An overflow wheel can be created on-demand. When a bucket in an overflow bucket expires, all tasks in it are reinserted into the timer recursively. The tasks are then moved to the finer grain wheels or be executed. The insert (start-timer) cost is O(m) where m is the number of wheels, which is usually very small compared to the number of requests in the system, and the delete (stop-timer) cost is still O(1).

Doubly Linked List for Buckets in Timing Wheels

In this design, we propose to use our own implementation of doubly linked list for the buckets in a timing wheel. The advantage of doubly linked list that it allows O(1) insert/delete of a list item if we have access link cells in a list.
A timer task saves a link cell in itself when enqueued to a timer queue. When a task is completed or canceled, the list is updated using the link cell saved in the task itself. 

Driving Clock using DelayQueue

A simple implementation may use a thread that wakes up every unit time and do the ticking, which checks if there is any task in the bucket. This can be wasteful if requests are sparse. We want the thread to wake up only when when there is a non-empty bucket to expire. We will do so by using java.util.concurrent.DelayQueue similarly to the current implementation, but we will enqueue task buckets instead of individual tasks. This design has a performance advantage. The number of items in DelayQueue is capped by the number of buckets, which is usually much smaller than the number of tasks, thus the number of offer/poll operations to the priority queue inside DelayQueue will be significantly smaller.

Purge of Watcher Lists

In the current implementation, the purge operation of watcher lists is triggered by the total size if the watcher lists. The problem is that the watcher lists may exceed the threshold even when there isn't many requests to purge. When this happens it increases the CPU load a lot. Ideally, the purge operation should be triggered by the number of completed requests the watcher lists.

In the new design, a completed request is removed from the timer queue immediately with O(1) cost. It means that the number of requests in the timer queue is the number of pending requests exactly at any time. So, if we know the total number of distinct requests in the purgatory, which includes the sum of the number of pending request and the numbers completed but still watched requests, we can avoid unnecessary purge operations. It is not trivial to keep track of the exact number of distinct requests in the purgatory because a request may or my not be watched. In the new design, we estimate the total number of requests in the purgatory rather than trying to maintain the exactly number.

The estimated number of requests are maintained as follows. The estimated total number of requests, E, is incremented whenever a new request is watched. Before starting the purge operation, we reset the estimated total number of requests to the size of timer queue. If no requests are added to the purgatory during purge, E is the correct number of requests after purge. If some requests are added to the purgatory during purge, E is incremented to E + the number of newly watched requests. This may be an overestimation because it is possible that some of the new requests are completed and remove from the watcher lists during the purge operation. We expect the chance of overestimation and an amount of overestimation are small.

Parameters

  • the tick size (the minimum time unit)
  • the wheel size (the number of buckets per wheel)

Benchmark

We compared the enqueue performance of two purgatory implementation, the current implementation and the proposed new implementation. This is a micro benchmark. It measures the purgatory enqueue performance. The purgatory was separated from the rest of the system and also uses a fake request which does nothing useful. So, the throughput of the purgatory in a real system may be much lower than the number shown by the test.

In the test, the intervals of the requests are assumed to follow the exponential distribution. Each request takes a time drawn from a log-normal distribution. By adjusting the shape of the log-normal distribution, we can test different timeout rate.

The tick size is 1ms and the wheel size is 20. The timeout was set to 200ms. The data size of a request was 100 bytes. For a low timeout rate case, we chose 75percentile = 60ms and 50percentile = 20. And for a high timeout rate case, we chose 75percentile = 400ms and 50percentile = 200ms. Total 1 million requests are enqueued in each run.

Requests are actively completed by a separate thread. Requests that are supposed to be completed before timeout are enqueued to another DelayQueue. And a separate thread keeps polling and completes them. There is no guarantee of accuracy in terms of actual completion time.

The JVM heap size is set to 200m to reproduce a memory tight situation.

The result shows a dramatic difference in a high enqueue rate area. As the target rate increases, both implementations keep up with the requests initially. However, in low timeout scenario the old implementation was saturated around 40000 RPS (request per second), whereas the proposed implementation didn't show any significant performance degradation, and in high timeout scenario the old implementation was saturated around 25000 RPS, whereas the proposed implementation was saturated 105000 RPS in this benchmark.

CPU usage is significantly better in the new implementation.

Finally, we measured total GC time (milliseconds) for ParNew collection and CMS collection. There isn't much difference in the old implementation and the new implementation in the region of enqueue rate that the old implementation can sustain.

 

 

Summary

In the new design, we use Hierarchical Timing Wheels for the timeout timer and DelayQueue of timer buckets to advance the clock on demand. Completed requests are removed from the timer queue immediately with O(1) cost. The buckets remain in the delay queue, however, the number of buckets is bounded. And, in a healthy system, most of the requests are satisfied before timeout, and many of the buckets become empty before pulled out of the delay queue. Thus, the timer should rarely have the buckets of the lower interval. The advantage of this design is that the number of requests in the timer queue is the number of pending requests exactly at any time. This allows us to estimate the number of requests need to be purged. We can avoid unnecessary purge operation of the watcher lists. As the result we achieve a higher scalability in terms of request rate with much better CPU usage.

 

  • No labels

17 Comments

  1. Thanks for writing up the proposal. A few comments below.

    1. About using the timing wheel. Our current timeout is in milli seconds. The timeouts are typically in hundreds of ms, but can be multi seconds or more. Would we choose u to be 1 ms? Would that cause the timing wheel to do busy checking every ms? It would be useful to outline the mechanism that the timing wheel uses to expire tasks. Also, what would be a good value for n?
    2. In purgatory, a request can be watched on many keys (say thousands), when an event triggers the completion of a request, we need to remove it from the watch list of all keys. Doing that in the caller thread may increase the latency in the request. If there is an easy way to do this in the background thread (as soon as possible), it would be better. 
  2. Yasuhiro Matsuda may have ideas, but here are my thoughts -

    Would we choose u to be 1 ms?

    No (smile) That wouldn't make sense due to busy checking. is your error delta which should be small enough that the timeout is roughly respected but not so small that causes busy waiting and diminishing returns. We should have a better idea of exact values for and after running some tests with real requests.

    Would that cause the timing wheel to do busy checking every ms? 

    Quite obviously, yes. But due to the reasons mentioned above, we shouldn't do it!

    On #2 above, I'm not entirely convinced that expiring thousands of requests in this new design will take much noticeable time. But this is something that we should observe through test results as well. I'd avoid early optimizing without knowing the actual impact.

  3. For ticking, I have a new idea that uses DelayQueue of timer-buckets (not requests). This allows us to advance the clock base on the actual usage.

    On #2, I saw a difficulty in using the doubly linked list for watcher lists. It comes from the concurrency issue. It is a challenge to make it correct and also performant. The resulting code is messy and hard to reason. At this point, I am more inclined to use a safer approach using weak references.

    I updated the design from these two points.

  4. Interesting proposal. Thanks for writing it up!

    Just curious - the use of WeakReferences handles the GC of the delayed operations but you still have weak references in the list (which I'm assuming are fixed size and small (probably since it is just a pointer + some fixed state)? - I guess we could take a heapdump and check. So to be clear this mitigates the problem since instead of having arbitrary delayed operations sitting around waiting to be cleared up by the expiration reaper we now have fixed size weak references. Is this correct?

  5. Correct. As you summarized, the use of WeakReference has a positive effect in GC. And cleared WeakReferences in the lists can be removed lazily. Basically we let JVM do bookkeeping of cleared WeakReferences through ReferenceQueue, and we will know when to purge the lists. I assume this part is lightweight, but it may require some tuning.

  6. Thanks for the writeup! A few comments below:

    1. Today when we try to complete a request based on a key, it is only removed from that watched list, but will still be in other watcher lists (if it has multiple keys) and we will not remove it from the delayed queue. Hence the elements will not be removed unless purge is triggered or it is expired. When we move from delayed queue of elements to delayed queue of buckets, the bucket elements (strong reference) will not be deleted once inserted unless purge is triggered. So it sounds to me that GC is still depending on purging intervals?

    2. A simpler alternative to configs of (bucket size + #. buckets) with hierarchical structure would be just using (bucket size) with unlimited number of uncontinuous buckets, which may work just as well in practice (I think Zookeeper is using a similar approach)?
  7. When a request is completed we will immediately remove it from a bucket, thus a strong reference to a request is cleared. Having pointer to the bucket in a request and using doubly linked list for a bucket is exactly for that purpose. All remaining references are weak references from watcher lists, so completed requests become GCable right away. The buckets remain in the delay queue. However, in a healthy system, most of the requests are satisfied before timeout. Many of the buckets become empty before pulled out of the delay queue. Also, the timer should rarely have the buckets of the lower interval. For these reasons, the delay queue of buckets is more advantageous than the delay queue of requests.

    1. I see, this makes much sense and would be a good improvement, especially for consumer coordinator which will also heavily use purgatory. In our current design we are using sth. like a heartbeat bucket for heartbeat purgatory for its own, but if this change can check-in sooner we could rebase our implementation on it.

  8. Thanks for the experimental results. A few comments below.

    1. In the figure, it seems that timeout really means the actual request completion time. We fix the timeout to 200ms, but varies the completion time based on certain distribution.
    2. In the figure, I guess the low timeout cases mean that most requests are completed before they expire whereas the high timeout cases mean that most requests expire. For the low timeout cases, I understand why the new approach is better since there are fewer items maintained in the DelayedQueue. However, for the high timeout cases, it's not clear to me why the new approach is better since in this case, we have to monitor requests at the finest level of granularity and the number of items in the DelayedQueue is probably comparable to the original approach. Could you provide a bit more explanation on this?
    3. I am not sure that I follow the description "Another thing worth noting is that there was a performance degradation in the current implementation when timeouts happen more often. It may be because the current implementation has to purge the delay queue and watch lists more often. The proposed implementation shows a slight degradation only when the enqueue rate was very high." Could you provide a bit more details?
    4. Also, could we also measure the impact on CPU and GC activities?
    5. In addition to requests with a small # of keys, it would be useful to test the case with many keys (say 1000).
  9. RE. #2

    Even in the high timeout case, the number of items in DelayQueue is very different (# of requests vs # of buckets). Removing one item from DelayQueue costs O(log( n )) (n is the number of items in the queue). n is bigger in the old implementation. In addition, there is one remove operation per item for the old implementation while there is one per bucket for the new implementation. I expect the combined effect shows up.

    RE. #3

    This is the high timeout case again. In the old implementation, purge is triggered by the size of watchers or the size of queue. It is executed  whenever they exceed the threshold (purgeInterval). When requests are coming in high rate, and most of them are timing out, both the size of watchers and the size of queue become bigger. There are more chance that purge is triggered. In the worst case, they are always above the threshold and trigger purge every time the expiration thread wakes up.

     

  10. 2. In the high timeout case, are there multiple items having the exact same timeout (at ms level)? If not, there will still be an item in the DelayedQueue per request, right?

  11. If no two requests have the same expiration, there is one request per bucket at the lowest wheel.  But buckets in higher wheels are likely to group requests together, so the size of the priority queue is much smaller.

  12. I ditched WeakReference because it increased CMS a lot.

  13. Can you clarify your last comment? The doc still mentions weak reference.

    For the benchmark:

    These are very useful and cool graphs! I just wanted to clarify my understanding on a couple of points: the old approach saturates at 40k/25k RPS. If this is due to GC and CPU why do the data points completely stop toward the right side of the target rate?

    I was also trying to fully understand why the new approach matches the target rate up to a much higher value than the old.

    My understanding of the approach is that you would want to size your first wheel and resolution to fit a reasonably large percentile of expected timeouts. The reason for this being: insert (start-timer) cost is O(m) but the actual tick which can move to the next timing wheel and thus need to re-insert the tasks (if any) which will be O(n) where n is the number of tasks in the higher wheel - and if there are even higher wheels then those need to be re-inserted in the next-lower wheel. So if the 75th percentile entry wheel for tasks is k then those tasks would be re-inserted k times. If k >> 2 then both n and u need to be recalibrated. So with the settings you have used, it seems the 75th percentile would fall into two wheels and 50th percentile of requests would fall into the first wheel. Say, the 90th percentile is in the third wheel - those would need to get re-inserted in the second wheel and then the first wheel. If my understanding is correct then how soon does the point of degradation come down if the percentile in higher wheels increases? Basically, I’m trying to gauge how resilient the performance is to bad configuration. Regardless this would be an excellent candidate for dynamic configs - i.e., some of these are hard to set ahead of time and you would want to be able to adjust these knobs on the fly.

  14. Oops. My bad. I thought I had removed all mentions to the weak reference. I updated the doc.

    think the old implementation has CPU bottleneck and maybe concurrency issues. The X axis of the CPU graph is the actual rate not the target rate. 

    You are right. A task in k-th wheel is reinserted k times until it times out. But luckily, k is log base u of the timeout. I think we are fine unless the timeout is ridiculously long. 

    Dynamic config sounds interesting. It may be possible to seamlessly migrate the timer to a new configuration while running. 

  15. Thanks for the new test results. How many keys were used in the test?

    1. I used three keys. I can add more benchmarks with larger numbers of keys.