Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. When max.in.flight.request.per.connection==1: batch.expiry.ms=MAX_LONGrequest.timeout.ms. Batches will stay in the accumulator as long as progress is being madeno longer than that request timeout. Will ensure expiration is also in order.
  2. When max.in.flight.request.per.connection > 1: delivery and notification ordering is not needed/provided. batch.expiry.ms=request.timeout.ms. Batches may expire out of order. 
As a consequence, batch expiration delay is coupled with strict ordering is desired or not. 

Validation

This configuration is backwards compatible. Report WARN log messages for various combinations of timeouts that don't make sense. (E.g., batch.expiry.ms < linger.ms).

...

  • Bumping up request timeout does not work well because that is an artificial way of dealing with the lack of an accumulator timeout. Setting it high will increase the time to detect broker failures.
  • In KAFKA-4089 we also considered looking at whether metadata is stale or not to determine whether to expire. This may work to address the problem raised in KAFKA-4089 but is still hard for users to understand without understanding internals of the producer and makes it difficult to put an upper bound on the overall timeout.
  • We cannot repurpose max.block.ms since there are use-cases for non-blocking calls to send.
  • We also discussed the ideal of providing precise per-record timeouts or at least per-batch timeouts. These are very difficult to implement correctly and we believe it is sufficient to provide users with the ability to determine an upper bound on delivery time (and not specify it on a per-record level). Supporting per-record timeouts precisely is problematic because we would then need the ability to extract records from compressed batches which is horribly inefficient. The difficulty is even more pronounced when requests are inflight since batches need to be extracted out of inflight requests. If we are to honor the retry backoff settings then this would mean that we have to split an inflight request with an expiring record or batch into smaller requests which is again horribly inefficient. Given the enormous complexity of implementing such semantics correctly and efficiently, and the limited value to users we have decided against pursuing this path. The addition of an explicit timeout as summarized in this proposal will at least give the users the ability to come up with a tight bound on the maximum delay before a record is actually sent out.
  • Allow batch.expiry.ms to span the inflight phase as well. This won't work because a request would contain batches from multiple partitions. One expiring batch should not cause the other batches to expire, and it is too inefficient to surgically remove the expired batch for the subsequent retry. 

  • An end-to-end timeout Model: A single end-to-end timeout for the entire send operation is a very easy to use and therefore, very compelling alternative. It's, however, not without pitfalls. The model of end-to-end timeout we considered is exclusive to segment-wise timeouts. In other words, you specify either of them but not both.

    1. An end-to-end timeout does not subsume max.block.ms because the latter is a bound on how long the application threads may block. An end-to-end timeout may only subsume time spent in accumulator (including linger.ms) and on the wire.

    2. In applications such as mirror-maker where consumed records are immediately produced on the other side, "catch-up mode" is a frequent scenario that creates additional challenges to end-to-end timeout model. A record may spent 99.95% of it's end-to-end budget in the accumulator and may not leave much budget at all for retrying over the wire. The producer may expire batches without any retries at all. An end-to-end delay of MAX_INT may be sufficient for mirror-makers but the same could not be said about a general application that has a few seconds of end-to-end timeout. It's unclear if a send failure due to timeout is due to an unavailable partition or just acccumulator wait. Exhaustion of nRetries combined with retry.backoff.ms pretty much guarantees that failure's due to service unavailability.

    3. An end-to-end timeout may be partially emulated using the future.get(timeout). The timeout must be greater than (batch.expiry.ms + nRetries * (request.timeout.ms + retry.backoff.ms)). Note that when future times out, Sender may continue to send the records in the background. To avoid that implementing a cancellable future is a possibility.

  • An additional configuration called "partition.availability.budget.ms" for producing applications that don't care about end-to-end bound on message delivery (and hence don't want to configure batch.expiry.ms) but do care about partitions that never make progress. A notional partition-unavailability-budget is useful for kafla-mirror-maker-like apps. It could be a function of retries, backoff period, and request timeout. Deemed too complicated for the benefit received. Specifically, a new mechanism is needed at the producer side that keeps track of unavailability of partitions over time.  Partition unknown and partition unavailable have different nuances.