Request Purgatory (0.8)

Without going into too much details (the code holds the truth), this page describes the work done by the request purgatory and its associated components.

What is purgatory?

The request purgatory is a holding pen for requests waiting to be satisfied (Delayed). Of all Kafka request types, it is used only for Produce and Fetch requests. The main reasons for keeping requests in purgatory while waiting for them to be satisfied are:

Both produce and fetch request types have different conditions to be added to and removed from their respective purgatory. In fact, there is a requests purgatory implementation for Producer (ProducerRequestPurgatory) and Fetch (FetchRequestPurgatory). Both extend the RequestPurgatory abstract class to add request type-specific implementations for expiration and satisfaction conditions checks.

Also, it should be noted that messages are not dropped by the purgatory but simply removed from its watch when they are satisfied. A client will always get a response from the broker under normal conditions, no matter if everything went smoothly or if there was an error.

Flow

When a request comes in and falls under the conditions to be put in purgatory (delayed), the following happens:

  1. request counter incremented
  2. request added to watchers pool with key based on (topic, partition) tuple.
  3. enqueued (DelayQueue) for the request reaper thread (detailed below)

So, there are 2 references of the delayed requests, one with the purgatory's watcher and another one for the request reaper. Once the request is satisfied, it will eventually be removed (purged) from both components.

Request Reaper Thread

The requests reaper thread purpose is to expire requests that have been delayed past their deadline.

It polls for expired requests and, when it finds one, runs the purgatory's expire method to handle the delayed request expiration. In both produce and fetch cases, it sends a response to the client. An expired request will be a satisfied request so it is eventually purged from both components.

The next step of the thread's loop is when it checks for the configuration parameters (*.purgatory.purge.interval.requests). When the number of delayed requests given to watch by the purgatory reaches this value, it goes through all previously queued requests and removes those which are marked as satisfied. Because of that, it is really an interval more than it is a threshold since it doesn't really care about the amount of satisfied requests or the size of the queue.

Purge interval configuration

The purge interval configuration (*.purgatory.purge.interval.requests) is mostly an "internal" configs that generally don't need to be modified by users. The reasons why it was added are as follow:

More details on this are in KAFKA-664

Producer requests handling

When is it added to purgatory (delayed)?

Producer config: request.required.acks

When does it expire?

When it reaches the timeout defined in the produce request (ackTimeoutMs). Translates from producer config request.timeout.ms.

What happens (on the broker) when it expires?

Sends a response to the client. Response content depends on the request of course.

When is it satisfied?

In a few words, it will be satisfied when enough followers have reached the offset required, based on ack value (-1/all or number of replicas in ISR)

Fetch requests handling

When is it added to purgatory (delayed)?

A fetch request will be added to purgatory depending on the following requests parameters:

Corresponding consumer configurations:

When does it expire?

After the amount of time the consumer is willing to wait for data (MaxWaitTime).

Consumer configuration: fetch.wait.max.ms

When is it satisfied?

A fetch request is satisfied when it has accumulated enough data to meet the MinBytes field

Consumer configuration: fetch.min.bytes

Metrics

The following metrics exists for fetch and produce purgatories:

To be verified
The same metric name is used for both Fetch and Producer, I should probably open a Jira for this but it gives trouble to the csv reporter since it tries to create a second file with the same name (for whichever metric comes first)

References