Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Clear timeout configurations - Configurations should be intuitive and and do exactly what they mean.
  2. Have bounded blocking time for send() - user should be able to have a bounded blocking time when they call send()
  3. No message/batch/requests will be able to reside in producer without a configurable timeout - currently batches can sit in accumulator for long time (KAFKA-1788), requests can be in-flight until TCP timeout (KAFKA-2120). In this KIP, we want to see how can we make sure we expire messages/batches/requests.

Public Interfaces

From discussion on the mailing list and within LinkedIn, we now have three options of the end state of timeout configuration for KafkaProduce:

Option 1

  1. max.buffer.full.block.ms   - To replace block.on.buffer.full. The max time to block when buffer is full.
  2. metadata.fetch.timeout.ms  - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available.
  3. network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

With the above approach, we can achieve the following.

  • Send() will need metadata and buffer available, so we can have bounded blocking time for send() = (1) + (2), not considering customized partitioner and serializer
  • The time after send() until response got received is generally bounded by linger.ms + (2) + (3), not taking retries into consideration.

The benefit of this option are:

  • User can have finer tuning on the producer timeout.
  • Configuration scope and usage are cleaner and more precise compared with option 2. (Not committing on per-message-timeout while actually shoehorning timeout of some messages to others)

The downside of this option are:

  • Need to educate user about metadata and buffer.
  • The request timeout is less explicit but will be affected by retries, linger.ms, retry backoff time, request timeout, etc.

Option 2

  1. max.enqueue.block.ms - the maximum time producer.send() will block, including:
    1. metadata fetch time
    2. buffer full block time
    3. serialization time (customized serializer)
    4. partitioning time (customized partitioner)
  2. request.timeout.ms - the maximum time to wait for the response of a message after message has been appended to the accumulator. Including:
    1. linger.ms
    2. actual network RTT
    3. server replication time
    4. retries

(1) can be alternatively exposed as by a new send() API but this might be misleading as user might think of it as the overall timeout for the message.

(2) is tricky to implement. We need to solve the following issues:

Issue 1: Once a message is in the accumulator, we lose per message control.

Possible solution: One implementation is always use the most latest expiry time of a message in the same batch as the batch expiry time. In this case, a message might be expired a little bit later, but will not be expired pre-maturely. The difference might be up to linger.ms. Similarly after batches are drained to a request. We use the latest expiry time of all batches in the same request as the expiry time of the request.

Issue 2: If there is retry, ideally request.timeout.ms should be at least linger.ms + replication.timeout.ms(probably we can remove it from user config) + retries * retry.backoff.ms. This sanity check might be difficult to explain to user.

Possible Solution: We only enforce request.timout.ms > linger.ms + some default replication.timeout.ms. We failed the batch either retries are exhausted or request.timeout.ms is reached, which ever come first.

The benefit of this option are:

  • Strait forward to user (assuming user does not care about the a little bit belated expiring of messages).
  • Has more explicit boundary for send() and how long a message will be sent or failed.

The downside of this option are:

  • Less accurate expiry time - message can be timeout out between (2) and 2*(2) + (1).
  • Might be a little bit difficult to tune for some user (e.g.. Some user might be willing to wait for buffer but not metadata)

Option 3

Similar to option 2 but a little bit different on the request.timeout.ms

We will expose the following timeout in KafkaProducer:

  1. max.enqueue.block.ms - the maximum time producer.send() will block, including:
    1. metadata fetch time
    2. buffer full block time
    3. serialization time (customized serializer)
    4. partitioning time (customized partitioner)
  2. request.timeout.ms - the maximum time to wait wait for the response of a message AFTER the batch is readyincluding:
    1. actual network RTT
    2. server replication time

The difference between option 2 and option 3 are:

Some thing to clarify:

  1. Request timeout.ms  only starts counting down Option 2 starts counting down when messages got into the queue. Option 3 start counting down only after a batch is ready.
  2. Request timeout in option 2 includes retries. Request timeout in option 3 .ms does not include retries - each try has a full request.timeout.ms. The reason we did not include retry here is that if we have the request timeout include retries, then we should actually keep retry until timeout. That indicates we should remove retry config but this causes problem for users who don't want duplicates. Also it is a little bit strange to timeout before user specified retries are exhausted.

...

  1. Request timeout will also be used when:
    1. the batches in the accumulator that are ready but not drained due to metadata missing - we are reusing request timeout to timeout the batches in accumulator.
    2. When people call partitionsFor() - we are reusing request timeout here in a sense of "waiting for metadata response of a topic". It is a little bit strange but will save us another timeout configuration.

The benefits of this approach areAmong the above three options, we prefer option 3, for the following reasons:

  1. Only two configurations needed
  2. Meet all the motivations
  3. Easy for user to understand and easy to implement (all the batches in the same request will have same timeout)
  4. The per message timeout is easy to compute - linger.ms + (retries + 1) * request.timeout.ms.
  5. Good configuration isolation between components. request timeout will be a network client configuration and can be passed in to accumulator.

NOTE: In all the configurations above, replication timeout has been taken off the list according to a separate discussion thread on mailing list. We are going to use request timeout in ProducerRequest.

 

Taking option 3, we We need to have the following changes:

...

Code Block
languagejava
public static final String REQUEST_TIMEOUT_MS_CONFIG = "network.request.timeout.ms";
private static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the producer will wait " + 
                                                  "for the response of a request. If the response is not received before the timeout " + 
                                                  "elapses the producer will resend the request if necessary or fail the request if " + 
                                                  "retries are exhausted."

...

2. Add max.enqueue.timeout.ms to org.apache.kafka.clients.producer.KafkaProducer to bound the send() block time.

Code Block
languagejava
public static final String MAX_ENQUEUE_TIMEOUT_MS_CONFIG = "max.enqueue.timeout.ms"
private static final String MAX_ENQUEUE_TIMEOUT_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} will block." +  
                                                      "The send method can be blocked because of buffer full, metadata not available, " + 
                                                      "customized partitioner/serialzier."

...

  1. associate a time stamp with each inflight requests and retry
  2. The poll() will go through the inflight requests and expire those requests if necessary.
  3. The timeout for poll() will need to consider the request timeout for inflight requests.
  4. When drain the data out of accumulator, we also check whether some batches should be expired.

The default request timeout will be set to a reasonable value, say 60 seconds.

...

  1. In 0.8.2.2 or 0.8.3, we will :
    1. Keep all the deprecated configuration there.
    2. If user sets TIMEOUT_CONFIG, we override REQUEST_TIMEOUT_CONFIG and show a deprecation warning and advertise the new configuration.
    3. If user set METADATA_FETCH_TIMEOUT_CONFIG, we override MAX_ENQUEUE_TIMEOUT_CONFIG with METADATA_FETCH_TIMEOUT_CONFIG and warn about the deprecation.
    4. if user set BLOCK_ON_BUFFER_FULL_CONFIG to true, we override the MAX_ENQUEUE_TIMEOUT_CONFIG with Long.MAX_VALUE and warn about the deprecation.
  2. In 0.9, we will remove TIMEOUT_CONFIG, METADATA_FETCH_TIMEOUT_CONFIG and BLOCK_ON_BUFFER_FULL_CONFIG.

Rejected Alternatives

Option 1

  1. max.buffer.full.block.ms   - To replace block.on.buffer.full. The max time to block when buffer is full.
  2. metadata.fetch.timeout.ms  - reuse metadata timeout as batch.timeout.ms

...

  1. because it is essentially metadata not available.
  2. network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

With the above approach, we can achieve the following.

  • Send() will need metadata and buffer available, so we can have bounded blocking time for send() = (1) + (2), not considering customized partitioner and serializer
  • The time after send() until response got received is generally bounded by linger.ms + (2) + (3), not taking retries into consideration.

The benefit of this option are:

  • User can have finer tuning on the producer timeout.
  • Configuration scope and usage are cleaner and more precise compared with option 2. (Not committing on per-message-timeout while actually shoehorning timeout of some messages to others)

The downside of this option are:

  • Need to educate user about metadata and buffer.
  • The request timeout is less explicit but will be affected by retries, linger.ms, retry backoff time, request timeout, etc.

Option 2

  1. max.enqueue.block.ms - the maximum time producer.send() will block, including:
    1. metadata fetch time
    2. buffer full block time
    3. serialization time (customized serializer)
    4. partitioning time (customized partitioner)
  2. request.timeout.ms - the maximum time to wait for the response of a message after message has been appended to the accumulator. Including:
    1. linger.ms
    2. actual network RTT
    3. server replication time
    4. retries

(1) can be alternatively exposed as by a new send() API but this might be misleading as user might think of it as the overall timeout for the message.

(2) is tricky to implement. We need to solve the following issues:

Issue 1: Once a message is in the accumulator, we lose per message control.

Possible solution: One implementation is always use the most latest expiry time of a message in the same batch as the batch expiry time. In this case, a message might be expired a little bit later, but will not be expired pre-maturely. The difference might be up to linger.ms. Similarly after batches are drained to a request. We use the latest expiry time of all batches in the same request as the expiry time of the request.

Issue 2: If there is retry, ideally request.timeout.ms should be at least linger.ms + replication.timeout.ms(probably we can remove it from user config) + retries * retry.backoff.ms. This sanity check might be difficult to explain to user.

Possible Solution: We only enforce request.timout.ms > linger.ms + some default replication.timeout.ms. We failed the batch either retries are exhausted or request.timeout.ms is reached, which ever come first.

The benefit of this option are:

  • Strait forward to user (assuming user does not care about the a little bit belated expiring of messages).
  • Has more explicit boundary for send() and how long a message will be sent or failed.

The downside of this option are:

  • Less accurate expiry time - message can be timeout out between (2) and 2*(2) + (1).
  • Might be a little bit difficult to tune for some user (e.g.. Some user might be willing to wait for buffer but not metadata)