Status

Current stateAdopted

Discussion thread[DISCUSS] KIP-91

Vote thread: [VOTE] KIP-91

JIRA: KAFKA-5886

Release: 2.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KIP-19, we added a request timeout to the network client. This change was necessary primarily to bound the time to detection of broker failures. In the absence of such a timeout, the producer would learn of the failure only much later (typically several minutes depending on the TCP timeout) during which the accumulator could fill up and cause requests to either block or get dropped depending on the block.on.buffer.full configuration. One additional goal of KIP-19 was to make timeouts intuitive. It is important for users to be provided with a guarantee on the maximum duration from when the call to send returns and when the callback fires (or future is ready). Notwithstanding the fact that intuition is a subjective thing, we will see shortly that this goal has not been met.

In order to clarify the motivation, it will be helpful to review the lifecycle of records and record-batches in the producer, where the timeouts apply, and changes that have been made since KIP-19.

Behavior in KIP-19

The following figure illustrates the above phases. The red circles are the potential points of timeout.

 

Change in KAFKA-2805 (to handle cluster outages)

One problem with the implementation of KIP-19 was that it did not check if metadata is stale or not. So for example, if the cluster suddenly becomes unavailable, the producer would never expire batches if it already has metadata available. So in KAFKA-2805 we completely removed the check on availability of metadata and indiscriminately expire batches that are ready and have remained in the accumulator for at least request.timeout.ms even if the leader broker is available.

Further change in KAFKA-3388 (to handle pessimistic timeouts and out-of-order callbacks when max.inflight.requests == 1)

The complete removal of the metadata availability check in KAFKA-2805 was problematic in that it:

  1. Leads to unfair/unnecessary timeouts especially when preceding batches that are inflight encounter retries. (Unfair because those batches are given another request.timeout.ms to expire.)
  2. Can cause callbacks to fire out of order when strict ordering is required. i.e., when accumulator batches expire, their callbacks fire before callbacks for inflight batches (that are actually preceding batches to the batches in the accumulator). Note that this is really an issue only for max.inflight.requests == 1 since we don't attempt to make any strict ordering guarantees for other inflight settings.
  3. Pessimistically expires batches even though it may be possible to make progress. (KIP-19 takes an optimistic view on the other hand - i.e., do not expire batches if metadata is available since we may be able to make progress.)

So KAFKA-3388 added a check (for the max.inflight.requests == 1 scenario only) on the inflight request queue and only expires batches if there is currently no inflight request.

There are still pessimistic timeouts (KAFKA-4089)

One problem with the above incremental change is the way in which we check whether there is any inflight request. Since it only applies to the scenario where max.inflight.requests == 1 we check if the partition is muted or not. (We mute partitions when a batch is inflight for that partition in order to ensure ordering even during leader movements - see KAFKA-3197 for more details on that.) The issue though is that if a metadata request is inflight (say, due to a normal metadata refresh) the partitions on that broker will not be in a muted state (since it is not a batch that is inflight) and can expire if they have been sitting in the accumulator for at least request.timeout.ms. This is an unintuitive side-effect given that they would otherwise have been sent out (had the metadata refresh not occurred).

This is actually a highly probable scenario in the case of a high-volume producer that sets max.inflight.requests to one such as the mirror maker and leads to an unnecessary shutdown of the mirror maker.

It is possible to bump up the request.timeout.ms but that is undesirable as explained in the rejected alternatives section.

Note also that it is insufficient to tweak the above check to see if it is a metadata request that is inflight because the problem is more fundamental than that: we would like to keep the request.timeout.ms to be relatively small (at most a minute or so). If we continue to reuse request.timeout.ms for timing out batches in the accumulator it is highly likely for a high volume producer to expire several batches even in routine scenarios such as bouncing the cluster that the producer is sending to. E.g., if a broker is disconnected due to a bounce then metadata will still be available, but there will be no inflight request to that broker and so several batches that have been in the accumulator for more than request.timeout.ms will get expired. This would be fine if the accumulator timeout is large enough to account for the expected time batches will sit in the accumulator, but this could be high for a high-volume producer. In the absence of an explicit accumulator timeout the only option here is to artificially bump up request.timeout.ms.

Proposed Changes

We propose adding a new timeout delivery.timeout.ms. The window of enforcement includes batching in the accumulator, retries, and the inflight segments of the batch. With this config, the user has a guaranteed upper bound on when a record will either get sent, fail or expire from the point when send returns. In other words we no longer overload request.timeout.ms to act as a weak proxy for accumulator timeout and instead introduce an explicit timeout that users can rely on without exposing any internals of the producer such as the accumulator. 

This config enables applications to delegate error handling to Kafka to the maximum possible extent (by setting retries=MAX_INT and delivery.timeout.ms=MAX_LONG). And it enables MirrorMaker to bound the effect of unavailable partitions by setting delivery.timeout.ms to be sufficiently low, presumably some function of the expected throughput in the steady state. Specifically, setting delivery.timeout.ms to a minimum of request.timeout.ms + retry.backoff.ms + linger.ms, would allow at least one attempt to send the message when the producer isn't backed up.

The "timer" for each batch starts "ticking" at the creation of the batch. Batches expire in order when max.in.flight.request.per.connection==1. An in-flight batch expire when delivery.timeout.ms has passed since the batch creation irrespective of whether the batch is in flight or not. However, the producer waits the full request.timeout.ms for the in-flight request. This implies that user might be notified of batch expiry while a batch is still in-flight.

Public Interfaces

Compatibility, Deprecation, and Migration Plan

Setting an explicit value of retries should be done with caution. If all the retries are exhausted, the request will fail and all the contained batches within the request (even if delivery.timeout.ms has not fully elapsed). In essence, batch expires when either delivery.timeout.ms has elapsed or the request containing the batch has failed, whichever happens first. (Note: Due to change in the default value of retries from 0 to MAX_INT and the existing default value of max.in.flight.request.per.connection==5, reordering becomes a possibility by default. To prevent reordering, set max.in.flight.request.per.connection==1).

Validation

This configuration is backwards compatible. Throw ConfigException for timeouts that don't make sense. (E.g., delivery.timeout.ms < linger.ms + request.timeout.ms + retry.backoff.ms).

Test Plan

Rejected Alternatives