Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Introduction

Kafka implements several request types that cannot immediately be answered with a response. Examples:

  • A produce request with acks=all cannot be considered complete until all replicas have acknowledged the write and we can guarantee it will not be lost if the leader fails.
  • A fetch request with min.bytes=1 won't be answered until there is at least one new byte of data for the consumer to consume. This allows a "long poll" so that the consumer need not busy wait checking for new data to arrive.

These requests are considered complete when either (a) the criteria they requested is complete or (b) some timeout occurs.

We intend to expand the use of this delayed request facility for various additional purposes including partition assignment and potentially quota enforcement.

The number of these asynchronous operations in flight at any time scales with the number of connections, which for Kafka is often tens of thousands.

A naive implementation of these would simply block the thread on the criteria, but this would not scale to the high number of in flight requests Kafka has.

The current approach uses a data structure called the "request purgatory" that holds onto any uncompleted requests which allows non-blocking event-based handling of these asynchronous requests. This approach is obviously better than having a thread per in-flight request but our implementation of the data structure that accomplishes this has a number of deficiencies. The goal of this proposal is to improve the efficiency of this data structure.

Current Design

The request purgatory consists of a timeout timer and a hash map of watcher lists for event driven processing. A request is put into a purgatory when it is not immediately satisfiable because of unmet conditions. A request in the purgatory is completed later when the conditions are met or is forced to be completed (timeout) when it passed beyond the time specified in the timeout parameter of the request. Currently (0.8.x) it uses Java DelayQueue to implement the timer and Java LinkedList for a watcher list.

...