Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Discuss Accepted

Discussion thread: here

JIRA: KAFKA-2120

...

We will expose the following timeout in KafkaProducer:

  1. max.enqueue.block.ms - the maximum time producer.send() and partitionsFor() will block, including.
    1. For send() it includes:
      1. metadata fetch time
      2. buffer full block time
      3. serialization time (customized serializer)
      4. partitioning time (customized partitioner)
    2. For partitionsFor() it includes:
      1. metadata fetch time
  2. request.timeout.ms - the maximum time to wait for the response of a message AFTER the batch is readyincluding:
    1. actual network RTT
    2. server replication time

...

  1. Request timeout.ms  only starts counting down after a batch is ready.
  2. Request timeout.ms does not include retries - each try has a full request.timeout.ms. The reason we did not include retry here is that if we have the request timeout include retries, then we should actually keep retry until timeout. That indicates we should remove retry config but this causes problem for users who don't want duplicates. Also it is a little bit strange to timeout before user specified retries are exhausted.
  3. Request timeout will also be used when : the batches in the accumulator that are ready but not drained due to metadata missing - we are reusing request timeout to timeout the batches in accumulator.When people call partitionsFor() - we are reusing request timeout here in a sense of "waiting for metadata response of a topic". It is a little bit strange but will save us another timeout configuration.

The benefits of this approach are:

...

Code Block
languagejava
public static final String REQUEST_TIMEOUT_MS_CONFIG = "network.request.timeout.ms";
private static final String REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the producer will wait " + 
                                                  "for the response of a request. If the response is not received before the timeout " + 
                                                  "elapses the producer will resend the request if necessary or fail the request if " + 
                                                  "retries are exhausted."

...

Code Block
languagejava
public static final String MAX_ENQUEUE_TIMEOUTBLOCK_MS_CONFIG = "max.enqueueblock.timeout.ms"
private static final String MAX_ENQUEUE_TIMEOUTBLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} will block." +  
                                                      "The send method can be blocked because of buffer full, metadata not available, " + 
                                                      "customized partitioner/serialzier."

...

  1. In 0.8.2.2 or 0.8.3, we will :
    1. Keep all the deprecated configuration there.
    2. If user sets TIMEOUT_CONFIG, we override REQUEST_TIMEOUT_CONFIG and show a deprecation warning and advertise the new configuration.
    3. If user set METADATA_FETCH_TIMEOUT_CONFIG, we override MAXoverride MAX_ENQUEUEBLOCK_TIMEOUTMS_CONFIG with METADATA_FETCH_TIMEOUT_CONFIG and warn about the deprecation.
    4. if user set BLOCK_ON_BUFFER_FULL_CONFIG to true, we override the MAX_ENQUEUEBLOCK_TIMEOUTMS_CONFIG with Long.MAX_VALUE and warn about the deprecation.
    5. if the user sets BLOCK_ON_BUFFER_FULL_CONFIG to true and also METADATA_FETCH_TIMEOUT_CONFIG, we will not honor METADATA_FETCH_TIMEOUT_CONFIG. 
      We will warn the users regarding the same. Also, we will be warning them to use the MAX_BLOCK_MS_CONFIG explicitly.
  2. In 0.9, we will remove TIMEOUT_CONFIG, METADATA_FETCH_TIMEOUT_CONFIG and BLOCK_ON_BUFFER_FULL_CONFIG.

...