Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Clear timeout configurations - Configurations should be intuitive and and do exactly what they mean.
  2. Have bounded blocking time for send() - user should be able to have a bounded blocking time when they call send()
  3. No message/batch/requests will be able to reside in producer without a configurable timeout - currently batches can sit in accumulator for long time (KAFKA-1788), requests can be in-flight until TCP timeout (KAFKA-2120). In this KIP, we want to see how can we make sure we expire messages/batches/requests.

Public Interfaces

From discussion on the mailing list and within LinkedIn, we now have three options of the end state of timeout configuration for KafkaProduce:

Option 1

...

public static final String NETWORK_REQUEST_TIMEOUT_CONFIG = "network.request.timeout.ms";

private static final String NETWORK_REQUEST_TIMEOUT_DOC = "The configuration controls the maximum amount of time the producer will wait for the response of a request. If the "

                                              + "response is not received before the timeout elapses the producer will resend the request if necessary or fail the request if retries are exhausted."

If a request is timeout and retries have not been exhausted, the request will be retried.

When a request is timeout and retries are exhausted, an org.apache.kafka.common.errors.TimeoutException will be put in the returned future of a request and callback will be fired.

2. There is an existing configuration TIMEOUT_CONFIG, which is really a server side timeout specifying the maximum amount of time brokers can wait before send back the response. It is a little bit strange to have this configuration on client side. We are going to remove this configuration from producer side and have it on broker side. It can be a log configuration.

3. To let user be able to have a bounded blocking time for producer.send(). We are going to change block.on.buffer.full to max.buffer.full.block.ms.

Discussion on end state of timeouts in producer

We want to see if we can reduce the configurations exposed to users so we have less confusion meanwhile still have the flexibility to support different use cases.

The timeout might be needed in the following three scenarios. So we will have at least three timeout configurations.

A. when user calls producer.send()

B. batches in accumulator

C. after request is sent

Currently we have the following timeout in producer configurations with scenario in the parenthesis.

 

  • (A) metadata.fetch.timeout.ms - The maximum time producer will wait for metadata to be ready in producer.send()
  • (C) replication.timeout.ms (to be removed) - This is a server side configuration. It defines how long a server will wait for the records to be replicated to followers.

replication.timeout.ms is a legacy configuration.

We are adding another configuration:

  • (C) network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

From what I can see, network.request.timeout.ms have clear purposes and should be a standalone configuration.

We also want to achieve the followings:

  • For (A) we want to have a bounded blocking time for send() method.
  • For (B) we need another timeout to expire the batches in accumulator in addition to batching purpose.

With above, we have the following configuration proposal for end state:

  • (A) max.buffer.full.block.ms - To replace block.on.buffer.full. The max time to block when buffer is full.
    • Use case: bound the blocking time of send() with metadata.fetch.timeout.ms.
  • (A/B) metadata.fetch.timeout.ms - reuse metadata timeout batches in accumulator because it is essentially metadata not available.
    • Use case 1: bound the time to wait for metadata in send().
    • Use case 2: prevent batches without metadata from sitting in the accumulator for too long.
  • (C) network.request.timeout.ms - the max time to wait for response.
    • Use case: to tolerate some slow server but not wait for a response forever when a broker is down.

From user's perspective:

...

:

  1. max.buffer.full.block.ms   - To replace block.on.buffer.full. The max time to block when buffer is full.
  2. metadata.fetch.timeout.ms  - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available.
  3. network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

With the above approach, we can achieve the following.

  • Send() will need metadata and buffer available, so we can have bounded blocking time for send() = (1) + (2), not considering customized partitioner and serializer
  • The time after send() until response got received is generally bounded by linger.ms + (2) + (3), not taking retries into consideration.

The benefit of this option are:

  • User can have finer tuning on the producer timeout.
  • Configuration scope and usage are cleaner and more precise compared with option 2. (Not committing on per-message-timeout while actually shoehorning timeout of some messages to others)

The downside of this option are:

  • Need to educate user about metadata and buffer.
  • The request timeout is less explicit but will be affected by retries, linger.ms, retry backoff time, request timeout, etc.

Option 2:

  1. max.enqueue.block.ms - the maximum time producer.send() will block, including:
    1. metadata fetch time
    2. buffer full block time
    3. serialization time (customized serializer)
    4. partitioning time (customized partitioner)
  2. request.timeout.ms - the maximum time to wait for the response of a message after message has been appended to the accumulator. Including:
    1. linger.ms
    2. actual network RTT
    3. server replication time
    4. retries

(1) can be alternatively exposed as by a new send() API but this might be misleading as user might think of it as the overall timeout for the message.

(2) is tricky to implement. We need to solve the following issues:

Issue 1: Once a message is in the accumulator, we lose per message control.

Possible solution: One implementation is always use the most latest expiry time of a message in the same batch as the batch expiry time. In this case, a message might be expired a little bit later, but will not be expired pre-maturely. The difference might be up to linger.ms. Similarly after batches are drained to a request. We use the latest expiry time of all batches in the same request as the expiry time of the request.

Issue 2: If there is retry, ideally request.timeout.ms should be at least linger.ms + replication.timeout.ms(probably we can remove it from user config) + retries * retry.backoff.ms. This sanity check might be difficult to explain to user.

Possible Solution: We only enforce request.timout.ms > linger.ms + some default replication.timeout.ms. We failed the batch either retries are exhausted or request.timeout.ms is reached, which ever come first.

The benefit of this option are:

  • Strait forward to user (assuming user does not care about the a little bit belated expiring of messages).
  • Has more explicit boundary for send() and how long a message will be sent or failed.

The downside of this option are:

  • Less accurate expiry time - message can be timeout out between (2) and 2*(2) + (1).
  • Might be a little bit difficult to tune for some user (e.g.. Some user might be willing to wait for buffer but not metadata)

Option 3

Similar to option 2 but a little bit different on the request.timeout.ms

  1. max.enqueue.block.ms - the maximum time producer.send() will block, including:
    1. metadata fetch time
    2. buffer full block time
    3. serialization time (customized serializer)
    4. partitioning time (customized partitioner)
  2. request.timeout.ms - the maximum time to wait for the response of a message AFTER the batch is ready,  including:
    1. actual network RTT
    2. server replication time

The difference between option 2 and option 3 are:

  1. Option 2 starts counting down when messages got into the queue. Option 3 start counting down only after a batch is ready.
  2. Request timeout in option 2 includes retries. Request timeout in option 3 does not include retries - each try has a full request.timeout. The reason we did not include retry here is that if we have the request timeout include retries, then we should actually keep retry until timeout. That indicates we should remove retry config but this causes problem for users who don't want duplicates. Also it is a little bit strange to timeout before user specified retries are exhausted.

In addition to that, request timeout also applies to the batches in the accumulator that are ready but not drained due to metadata missing - we are reusing request timeout to timeout the batches in accumulator.

Among the above three options, we prefer option 3, for the following reasons:

  1. Only two configurations needed
  2. Meet all the motivations
  3. Easy for user to understand and easy to implement (all the batches in the same request will have same timeout)
  4. The per message timeout is easy to compute - linger.ms + retries * request.timeout.ms.
  5. Good configuration isolation between components. request timeout will be a network client configuration and can be passed in to accumulator.

NOTE: In all the configurations above, replication timeout has been taken off the list according to a separate discussion thread on mailing list. We are going to use request timeout in ProducerRequest.

 

Taking option 3, we need to have the following changes:

1. Add the following new configuration to org.apache.kafka.clients.producer.ProducerConfig and org.apache.kafka.clients.consumer.ConsumerConfig:

Code Block
languagejava
public static final String REQUEST_TIMEOUT_CONFIG = "network.request.timeout.ms";
private static final String REQUEST_TIMEOUT_DOC = "The configuration controls the maximum amount of time the producer will wait for the response of a request. If the response is not received before the timeout elapses the producer will resend the request if necessary or fail the request if retries are exhausted."

If a request is timeout and retries have not been exhausted, the request will be retried.

When a request is timeout and retries are exhausted, an org.apache.kafka.common.errors.TimeoutException will be put in the returned future of a request and callback will be fired.

2. Add max.enqueue.timeout.ms to bound the send() block time.

Code Block
languagejava
public static final String MAX_ENQUEUE_TIMEOUT_MS_CONFIG = "max.enqueue.timeout.ms"
private static final String MAX_ENQUEUE_TIMEOUT_DOC = "The configuration controls how long {@link KafkaProducer#send()} will block. The send method can be blocked because of buffer full, metadata not available, customized partitioner/serialzier."

3. Remove METADATA_FETCH_TIMEOUT_CONFIG, TIMEOUT_CONFIG, BLOCK_ON_BUFFER_FULL_CONFIG

...

.

Proposed Changes

Because the NetworkClient works in an async way. We need to keep track of the send time for each request. So the implementation will the following:

...

In most cases, metadata refresh should be able to pick up the new leader if a broker is down. If a broker was not down but just slow, as long as request timeout is set to a reasonable value, we should not see dramatic increase in TCP connections when a broker was not down but just slow.

...

Compatibility, Deprecation, and Migration Plan

To remove TIMEOUT_CONFIGthese configurations, we will do the following;

...

  1. Modify server side code to take both V0 and V1 of producer request.
  2. In 0.8.2.2 or 0.8.3, we will leave TIMEOUT_CONFIG there. Producer will still send producer request V0. :
    1. Keep all the deprecated configuration there.
    2. If user sets TIMEOUT_CONFIG, we override REQUEST_TIMEOUT_CONFIG and show a deprecation warning and advertise the
    server side replication timeout
    1. new configuration.
  3. In 0.9, we will remove TIMEOUT_CONFIG from the producer configuration. Producer will send ProducerRequest V1.

To replace block.on.buffer.full with max.buffer.full.block.ms:

    1. If user set METADATA_FETCH_TIMEOUT_CONFIG, we override MAX_ENQUEUE_TIMEOUT_CONFIG with METADATA_FETCH_TIMEOUT_CONFIG and warn about the deprecation.
    2. if user set BLOCK_ON_BUFFER_FULL_CONFIG to true, we override the MAX_ENQUEUE_TIMEOUT_CONFIG with
    In 0.8.2.2 or 0.8.3, we will add max.buffer.full.block.ms and leave block.on.buffer.full there. If user sets block.on.buffer.full=true, we will set max.buffer.full.block.ms=
    1. Long.MAX_VALUE and
    show a
    1. warn about the deprecation
    warning
    1. .
  1. In 0.9, we will remove the block.on.buffer.full

Compatibility, Deprecation, and Migration Plan

...

  1. TIMEOUT_CONFIG, METADATA_FETCH_TIMEOUT_CONFIG and BLOCK_ON_BUFFER_FULL_CONFIG.

Rejected Alternatives

batch.timeout.ms - In some cases, the leader broker information of a batch could be unavailable after the batch is ready to be sent (i.e. batch is full or reached linger.ms). This configuration define the maximum time the producer will wait for the necessary broker information before it fail the batches.