Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Discuss Accepted

Discussion thread: here

JIRA: KAFKA-2120

...

This KIP is only trying to solve the problem after a request has been constructed. For new producer specifically, there are situations where record batches can sit in accumulator for very long before they are drained out of RecordAccumulator (KAKFA-1788). Since the handling for those situations are more state-based instead of time-based, we are not trying to address them together in this KIP.

Public Interfaces

We want to achieve the following with this timeout adjustments:

  1. Clear timeout configurations - Configurations should be intuitive and and do exactly what they mean.
  2. Have bounded blocking time for send() - user should be able to have a bounded blocking time when they call send()
  3. No message/batch/requests will be able to reside in producer without a configurable timeout - currently batches can sit in accumulator for long time (KAFKA-1788), requests can be in-flight until TCP timeout (KAFKA-2120). In this KIP, we want to see how can we make sure we expire messages/batches/requests.

Public Interfaces

We will expose the following timeout in KafkaProducer:

  1. max.block.ms - the maximum time producer.send() and partitionsFor() will block.
    1. For send() it includes:
      1. metadata fetch time
      2. buffer full block time
      3. serialization time (customized serializer)
      4. partitioning time (customized partitioner)
    2. For partitionsFor() it includes:
      1. metadata fetch time
  2. request.timeout.ms - the maximum time to wait for the response of a message AFTER the batch is readyincluding:
    1. actual network RTT
    2. server replication time

Some thing to clarify:

  1. Request timeout.ms  only starts counting down after a batch is ready.
  2. Request timeout.ms does not include retries - each try has a full request.timeout.ms. The reason we did not include retry here is that if we have the request timeout include retries, then we should actually keep retry until timeout. That indicates we should remove retry config but this causes problem for users who don't want duplicates. Also it is a little bit strange to timeout before user specified retries are exhausted.
  3. Request timeout will also be used when the batches in the accumulator that are ready but not drained due to metadata missing - we are reusing request timeout to timeout the batches in accumulator.

The benefits of this approach are:

  1. Only two configurations needed
  2. Meet all the motivations
  3. Easy for user to understand and easy to implement (all the batches in the same request will have same timeout)
  4. The per message timeout is easy to compute - linger.ms + (retries + 1) * request.timeout.ms.
  5. Good configuration isolation between components. request timeout will be a network client configuration and can be passed in to accumulator.

NOTE: In all the configurations above, replication timeout has been taken off the list according to a separate discussion thread on mailing list. We are going to use request timeout in ProducerRequest.

We need to have the following changes:

1. Add the following new configuration to org.apache.kafka.clients.producer.ProducerConfig and org.apache.kafka.clients.consumer.ConsumerConfig:

Code Block
languagejava
public static final String

...

 REQUEST_TIMEOUT_MS_CONFIG = "

...

request.timeout.ms";

...


private static final String

...

 REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the producer will wait " + 
                                                  "for the response of a request. If the

...

 response is not received before the timeout " + 
                                                  "elapses the producer will resend the request if necessary or fail the request if " + 
                                                  "retries are exhausted."

If a request is timeout and retries have not been exhausted, the request will be retried.

When a request is timeout and retries are exhausted, an org.apache.kafka.common.errors.TimeoutException will be put in the returned future of a request and callback will be fired.

There is an existing configuration TIMEOUT_CONFIG, which is really a server side timeout specifying the maximum amount of time brokers can wait before send back the response. In order to avoid future confusion, for KafkaProducer, we might also want to change TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG.

Discussion on end state of timeouts in producer

We want to see if we can reduce the configurations exposed to users so we have less confusion meanwhile still have the flexibility to support different use cases.

The timeout might be needed in the following three scenarios. So we will have at least three timeout configurations.

A. when user calls producer.send()

B. batches in accumulator

C. after request is sent

Currently we might have the following timeout in producer configurations with scenario in the parenthesis.

  1. (A) metadata.fetch.timeout.ms - The maximum time producer will wait for metadata to be ready in producer.send()
  2. (C) replication.timeout.ms - This is a server side configuration. It defines how long a server will wait for the records to be replicated to followers.
  3. (C) network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

From what I can see, network.request.timeout.ms have clear purposes and we probably want to keep it.

replication.timeout.ms is a legacy configuration, we might be able to get it away but let's keep it for now.

For (B) we need another timeout to expire the batches in accumulator in addition to batching purpose. That means we probably need the following configuration.

      4. (B) batch.timeout.ms - In some cases, the leader broker information of a batch could be unavailable after the batch is ready to be sent (i.e. batch is full or reached linger.ms). This configuration define the maximum time the producer will wait for the necessary broker information before it fail the batches.

For (A) it makes sense to have a dedicated timeout for send() method to provide blocking bound. So it gives another configuration.

      5. (A) max.send.block.ms - This timeout defines how long the producer.send() method can block (we will enforce this for both metadata and blocking on buffer full in a separate ticket).

With above, we have the following proposals and want to see which one makes more sense:

Proposal 1:

  • (A/B) metadata.fetch.timeout.ms - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available.
  • (C) replication.timeout.ms
  • (C) network.request.timeout.ms

Proposal 2:

  • (A) max.send.block.ms
  • (B) batch.timeout.ms
  • (C) replication.timeout.ms
  • (C) network.request.timeout.ms

Proposal 1 has four configurations but does got guarantee of blocking time for send() when blocking on buffer full is turned on. So user needs to catch the exception in outside send and retry if buffer is full.

Proposal 2 has five configurations but guarantee the blocking time of send().

2. Add max.enqueue.timeout.ms to org.apache.kafka.clients.producer.KafkaProducer to bound the send() block time.

Code Block
languagejava
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms"
private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} will block." +  
                                                      "The send method can be blocked because of buffer full, metadata not available, " + 
                                                      "customized partitioner/serialzier."

3. Remove METADATA_FETCH_TIMEOUT_CONFIG, TIMEOUT_CONFIG, BLOCK_ON_BUFFER_FULL_CONFIG.

Proposed Changes

Because the NetworkClient works in an async way. We need to keep track of the send time for each request. So the implementation will the following:

  1. associate a time stamp with each inflight requests and retry
  2. The poll() will go through the inflight requests and expire those requests if necessary.
  3. The timeout for poll() will need to consider the request timeout for inflight requests.
  4. When drain the data out of accumulator, we also check whether some batches should be expired.

The default request timeout will be set to a reasonable value, say 60 seconds.

...

In most cases, metadata refresh should be able to pick up the new leader if a broker is down. If a broker was not down but just slow, as long as request timeout is set to a reasonable value, we should not see dramatic increase in TCP connections when a broker was not down but just slow.

Plan to deprecate TIMEOUT_CONFIG

Compatibility, Deprecation, and Migration Plan

To remove these configurationsTo replace TIMEOUT_CONFIG with REPLICATION_TIMEOUT_CONFIG, we will do the following;

  1. In 0.8.2.2 or 0.8.3, we will add REPLICATION_TIMEOUT_CONFIG but leave TIMEOUT_CONFIG there. :
    1. Keep all the deprecated configuration there.
    2. If user sets TIMEOUT_CONFIG, we override REQUEST_TIMEOUT_CONFIG and show a deprecation warning and
    use TIMEOUT
    1. advertise the new configuration.
    2. If user set METADATA_FETCH_TIMEOUT_CONFIG, we override MAX_BLOCK_MS_CONFIG with METADATA_FETCH_TIMEOUT_CONFIG and warn about the deprecation.
    3. if user set BLOCK_ON_BUFFER_FULL_CONFIG to
    override REPLICATION_TIMEOUT_CONFIG
    1. true, we override the MAX_BLOCK_MS_CONFIG with Long.MAX_VALUE and warn about the deprecation.
    2. if the user sets BLOCK_ON_BUFFER_FULL_CONFIG to true and also METADATA_FETCH_TIMEOUT_CONFIG, we will not honor METADATA_FETCH_TIMEOUT_CONFIG. 
      We will warn the users regarding the same. Also, we will be warning them to use the MAX_BLOCK_MS_CONFIG explicitly.
  2. In 0.9, we will remove TIMEOUT_CONFIG from the configuration.

Compatibility, Deprecation, and Migration Plan

The changes should be backward compatible.

Rejected Alternatives

  1. , METADATA_FETCH_TIMEOUT_CONFIG and BLOCK_ON_BUFFER_FULL_CONFIG.

Rejected Alternatives

Option 1

  1. max.buffer.full.block.ms   - To replace block.on.buffer.full. The max time to block when buffer is full.
  2. metadata.fetch.timeout.ms  - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available.
  3. network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

With the above approach, we can achieve the following.

  • Send() will need metadata and buffer available, so we can have bounded blocking time for send() = (1) + (2), not considering customized partitioner and serializer
  • The time after send() until response got received is generally bounded by linger.ms + (2) + (3), not taking retries into consideration.

The benefit of this option are:

  • User can have finer tuning on the producer timeout.
  • Configuration scope and usage are cleaner and more precise compared with option 2. (Not committing on per-message-timeout while actually shoehorning timeout of some messages to others)

The downside of this option are:

  • Need to educate user about metadata and buffer.
  • The request timeout is less explicit but will be affected by retries, linger.ms, retry backoff time, request timeout, etc.

Option 2

  1. max.enqueue.block.ms - the maximum time producer.send() will block, including:
    1. metadata fetch time
    2. buffer full block time
    3. serialization time (customized serializer)
    4. partitioning time (customized partitioner)
  2. request.timeout.ms - the maximum time to wait for the response of a message after message has been appended to the accumulator. Including:
    1. linger.ms
    2. actual network RTT
    3. server replication time
    4. retries

(1) can be alternatively exposed as by a new send() API but this might be misleading as user might think of it as the overall timeout for the message.

(2) is tricky to implement. We need to solve the following issues:

Issue 1: Once a message is in the accumulator, we lose per message control.

Possible solution: One implementation is always use the most latest expiry time of a message in the same batch as the batch expiry time. In this case, a message might be expired a little bit later, but will not be expired pre-maturely. The difference might be up to linger.ms. Similarly after batches are drained to a request. We use the latest expiry time of all batches in the same request as the expiry time of the request.

Issue 2: If there is retry, ideally request.timeout.ms should be at least linger.ms + replication.timeout.ms(probably we can remove it from user config) + retries * retry.backoff.ms. This sanity check might be difficult to explain to user.

Possible Solution: We only enforce request.timout.ms > linger.ms + some default replication.timeout.ms. We failed the batch either retries are exhausted or request.timeout.ms is reached, which ever come first.

The benefit of this option are:

  • Strait forward to user (assuming user does not care about the a little bit belated expiring of messages).
  • Has more explicit boundary for send() and how long a message will be sent or failed.

The downside of this option are:

  • Less accurate expiry time - message can be timeout out between (2) and 2*(2) + (1).
  • Might be a little bit difficult to tune for some user (e.g.. Some user might be willing to wait for buffer but not metadata)

...