Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Discuss Accepted

Discussion thread: here

JIRA: KAFKA-2120

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In old producer/consumer, we have a socket timeout associated with each request, the producer/consumer will send a request and then wait for response. If no response is received from a broker within specified timeout, the request will fail.

...

  1. Clear timeout configurations - Configurations should be intuitive and and do exactly what they mean.
  2. Have bounded blocking time for send() - user should be able to have a bounded blocking time when they call send()
  3. No message/batch/requests will be able to reside in producer without a configurable timeout - currently batches can sit in accumulator for long time (KAFKA-1788), requests can be in-flight until TCP timeout (KAFKA-2120). In this KIP, we want to see how can we make sure we expire messages/batches/requests.

Public Interfaces

We will expose the following timeout in KafkaProducer:

  1. max.block.ms - the maximum time producer.send() and partitionsFor() will block.
    1. For send() it includes:
      1. metadata fetch time
      2. buffer full block time
      3. serialization time (customized serializer)
      4. partitioning time (customized partitioner)
    2. For partitionsFor() it includes:
      1. metadata fetch time
  2. request.timeout.ms - the maximum time to wait for the response of a message AFTER the batch is readyincluding:
    1. actual network RTT
    2. server replication time

Some thing to clarify:

  1. Request timeout.ms  only starts counting down after a batch is ready.
  2. Request timeout.ms does not include retries - each try has a full request.timeout.ms. The reason we did not include retry here is that if we have the request timeout include retries, then we should actually keep retry until timeout. That indicates we should remove retry config but this causes problem for users who don't want duplicates. Also it is a little bit strange to timeout before user specified retries are exhausted.
  3. Request timeout will also be used when the batches in the accumulator that are ready but not drained due to metadata missing - we are reusing request timeout to timeout the batches in accumulator.

The benefits of this approach are:

  1. Only two configurations needed
  2. Meet all the motivations
  3. Easy for user to understand and easy to implement (all the batches in the same request will have same timeout)
  4. The per message timeout is easy to compute - linger.ms + (retries + 1) * request.timeout.ms.
  5. Good configuration isolation between components. request timeout will be a network client configuration and can be passed in to accumulator.

NOTE: In all the configurations above, replication timeout has been taken off the list according to a separate discussion thread on mailing list. We are going to use request timeout in ProducerRequest.

We need to have the following changes:

1. Add the following new configuration to org.apache.kafka.clients.producer.ProducerConfig and org.apache.kafka.clients.consumer.ConsumerConfig:

Code Block
languagejava
public static final String

...

 REQUEST_TIMEOUT_MS_CONFIG = "

...

request.timeout.ms";

...


private static final String

...

 REQUEST_TIMEOUT_MS_DOC = "The configuration controls the maximum amount of time the producer will wait " + 
                                                  "for the response of a request. If the

...

 response is not received before the timeout " + 
                                                  "elapses the producer will resend the request if necessary or fail the request if " + 
                                                  "retries are exhausted."

If a request is timeout and retries have not been exhausted, the request will be retried.

When a request is timeout and retries are exhausted, an org.apache.kafka.common.errors.TimeoutException will be put in the returned future of a request and callback will be fired.

2. There is an existing configuration TIMEOUT_CONFIG, which is really a server side timeout specifying the maximum amount of time brokers can wait before send back the response. It is a little bit strange to have this configuration on client side. We are going to remove this configuration from producer side and have it on broker side. It can be a log configuration.

3. To let user be able to have a bounded blocking time for producer.send(). We are going to change block.on.buffer.full to max.buffer.full.block.ms.

Discussion on end state of timeouts in producer

We want to see if we can reduce the configurations exposed to users so we have less confusion meanwhile still have the flexibility to support different use cases.

The timeout might be needed in the following three scenarios. So we will have at least three timeout configurations.

A. when user calls producer.send()

B. batches in accumulator

C. after request is sent

Currently we have the following timeout in producer configurations with scenario in the parenthesis.

 

  • (A) metadata.fetch.timeout.ms - The maximum time producer will wait for metadata to be ready in producer.send()
  • (C) replication.timeout.ms (to be removed) - This is a server side configuration. It defines how long a server will wait for the records to be replicated to followers.

replication.timeout.ms is a legacy configuration.

We are adding another configuration:

  • (C) network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

From what I can see, network.request.timeout.ms have clear purposes and should be a standalone configuration.

We also want to achieve the followings:

  • For (A) we want to have a bounded blocking time for send() method.
  • For (B) we need another timeout to expire the batches in accumulator in addition to batching purpose.

With above, we have the following configuration proposal for end state:

  • (A) max.buffer.full.block.ms - To replace block.on.buffer.full. The max time to block when buffer is full.
    • Use case: bound the blocking time of send() with metadata.fetch.timeout.ms.
  • (A/B) metadata.fetch.timeout.ms - reuse metadata timeout batches in accumulator because it is essentially metadata not available.
    • Use case 1: bound the time to wait for metadata in send().
    • Use case 2: prevent batches without metadata from sitting in the accumulator for too long.
  • (C) network.request.timeout.ms - the max time to wait for response.
    • Use case: to tolerate some slow server but not wait for a response forever when a broker is down.

From user's perspective:

...

Add max.enqueue.timeout.ms to org.apache.kafka.clients.producer.KafkaProducer to bound the send() block time.

Code Block
languagejava
public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms"
private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long {@link KafkaProducer#send()} will block." +  
                                                      "The send method can be blocked because of buffer full, metadata not available, " + 
                                                      "customized partitioner/serialzier."

3. Remove METADATA_FETCH_TIMEOUT_CONFIG, TIMEOUT_CONFIG, BLOCK_ON_BUFFER_FULL_CONFIG.

Proposed Changes

Because the NetworkClient works in an async way. We need to keep track of the send time for each request. So the implementation will the following:

  1. associate a time stamp with each inflight requests and retry
  2. The poll() will go through the inflight requests and expire those requests if necessary.
  3. The timeout for poll() will need to consider the request timeout for inflight requests.
  4. When drain the data out of accumulator, we also check whether some batches should be expired.

The default request timeout will be set to a reasonable value, say 60 seconds.

Actions after request timeout

When the request timeout has been reached, we do the following:

...

In most cases, metadata refresh should be able to pick up the new leader if a broker is down. If a broker was not down but just slow, as long as request timeout is set to a reasonable value, we should not see dramatic increase in TCP connections when a broker was not down but just slow.

...

Compatibility, Deprecation, and Migration Plan

To remove TIMEOUT_CONFIGthese configurations, we will do the following;

...

  1. In 0.8.2.2 or 0.8.3, we will leave TIMEOUT_CONFIG there. Producer will still send producer request V0. :
    1. Keep all the deprecated configuration there.
    2. If user sets TIMEOUT_CONFIG, we override REQUEST_TIMEOUT_CONFIG and show a deprecation warning and advertise the
    server side replication timeout configuration.
    1. new configuration.
    2. If user set METADATA_FETCH_TIMEOUT_CONFIG, we override MAX_BLOCK_MS_CONFIG with METADATA_FETCH_TIMEOUT_CONFIG and warn about the deprecation.
    3. if user set BLOCK_ON_BUFFER_FULL_CONFIG to true, we override the MAX_BLOCK_MS_CONFIG with Long.MAX_VALUE and warn about the deprecation.
    4. if the user sets BLOCK_ON_BUFFER_FULL_CONFIG to true and also METADATA_FETCH_TIMEOUT_CONFIG, we will not honor METADATA_FETCH_TIMEOUT_CONFIG. 
      We will warn the users regarding the same. Also, we will be warning them to use the MAX_BLOCK_MS_CONFIG explicitly.
  2. In 0.9, we will remove TIMEOUT_CONFIG, METADATA_FETCH_TIMEOUT_CONFIG and BLOCK_ON_BUFFER_CONFIG from the producer configuration. Producer will send ProducerRequest V1.FULL_CONFIG.

Rejected Alternatives

Option 1

  1. max.buffer.full.block.ms   - To replace block.on.buffer.full

...

  1. In 0.8.2.2 or 0.8.3, we will add max.buffer.full.block.ms and leave block.on.buffer.full there. If user sets block.on.buffer.full=true, we will set max.buffer.full.block.ms=Long.MAX_VALUE and show a deprecation warning.
  2. In 0.9, we will remove the block.on.buffer.full

Compatibility, Deprecation, and Migration Plan

The changes should be backward compatible.

Rejected Alternatives

...

  1. . The max time to block when buffer is full.
  2. metadata.fetch.timeout.ms  - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available.
  3. network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

With the above approach, we can achieve the following.

  • Send() will need metadata and buffer available, so we can have bounded blocking time for send() = (1) + (2), not considering customized partitioner and serializer
  • The time after send() until response got received is generally bounded by linger.ms + (2) + (3), not taking retries into consideration.

The benefit of this option are:

  • User can have finer tuning on the producer timeout.
  • Configuration scope and usage are cleaner and more precise compared with option 2. (Not committing on per-message-timeout while actually shoehorning timeout of some messages to others)

The downside of this option are:

  • Need to educate user about metadata and buffer.
  • The request timeout is less explicit but will be affected by retries, linger.ms, retry backoff time, request timeout, etc.

Option 2

  1. max.enqueue.block.ms - the maximum time producer.send() will block, including:
    1. metadata fetch time
    2. buffer full block time
    3. serialization time (customized serializer)
    4. partitioning time (customized partitioner)
  2. request.timeout.ms - the maximum time to wait for the response of a message after message has been appended to the accumulator. Including:
    1. linger.ms
    2. actual network RTT
    3. server replication time
    4. retries

(1) can be alternatively exposed as by a new send() API but this might be misleading as user might think of it as the overall timeout for the message.

(2) is tricky to implement. We need to solve the following issues:

Issue 1: Once a message is in the accumulator, we lose per message control.

Possible solution: One implementation is always use the most latest expiry time of a message in the same batch as the batch expiry time. In this case, a message might be expired a little bit later, but will not be expired pre-maturely. The difference might be up to linger.ms. Similarly after batches are drained to a request. We use the latest expiry time of all batches in the same request as the expiry time of the request.

Issue 2: If there is retry, ideally request.timeout.ms should be at least linger.ms + replication.timeout.ms(probably we can remove it from user config) + retries * retry.backoff.ms. This sanity check might be difficult to explain to user.

Possible Solution: We only enforce request.timout.ms > linger.ms + some default replication.timeout.ms. We failed the batch either retries are exhausted or request.timeout.ms is reached, which ever come first.

The benefit of this option are:

  • Strait forward to user (assuming user does not care about the a little bit belated expiring of messages).
  • Has more explicit boundary for send() and how long a message will be sent or failed.

The downside of this option are:

  • Less accurate expiry time - message can be timeout out between (2) and 2*(2) + (1).
  • Might be a little bit difficult to tune for some user (e.g.. Some user might be willing to wait for buffer but not metadata)