Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Discuss

Discussion thread: here

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In old producer/consumer, we have a socket timeout associated with each request, the producer/consumer will send a request and then wait for response. If no response is received from a broker within specified timeout, the request will fail.

...

This KIP is only trying to solve the problem after a request has been constructed. For new producer specifically, there are situations where record batches can sit in accumulator for very long before they are drained out of RecordAccumulator (KAKFA-1788). Since the handling for those situations are more state-based instead of time-based, we are not trying to address them together in this KIP

We want to achieve the following with this timeout adjustments:

  1. Clear timeout configurations - Configurations should be intuitive and and do exactly what they mean.
  2. Have bounded blocking time for send() - user should be able to have a bounded blocking time when they call send()
  3. No message/batch/requests will be able to reside in producer without a configurable timeout - currently batches can sit in accumulator for long time (KAFKA-1788), requests can be in-flight until TCP timeout (KAFKA-2120). In this KIP, we want to see how can we make sure we expire messages/batches/requests.

Public Interfaces

1. Add the following new configuration to org.apache.kafka.clients.producer.ProducerConfig and org.apache.kafka.clients.consumer.ConsumerConfig:

...

When a request is timeout and retries are exhausted, an org.apache.kafka.common.errors.TimeoutException will be put in the returned future of a request and callback will be fired.

2. There is an existing configuration TIMEOUT_CONFIG, which is really a server side timeout specifying the maximum amount of time brokers can wait before send back the response. In order to avoid future confusion, for KafkaProducer, we might also want to change TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG. It is a little bit strange to have this configuration on client side. We are going to remove this configuration from producer side and have it on broker side. It can be a log configuration.

3. To let user be able to have a bounded blocking time for producer.send(). We are going to change block.on.buffer.full to max.buffer.full.block.ms.

Discussion on end state of timeouts in producer

We want to see if we can reduce the configurations exposed to users so we have less confusion meanwhile still have the flexibility to support different use cases.

...

C. after request is sent

Currently we might have the following timeout in producer configurations with scenario in the parenthesis.

 

  • (A) metadata.fetch.timeout.ms - The maximum time producer will wait for metadata to be ready in producer.send()
  • (C) replication.timeout.ms (to be removed) - This is a server side configuration. It defines how long a server will wait for the records to be replicated to followers.

replication.timeout.ms is a legacy configuration.

We are adding another configuration:

  • (C) network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

From what I can see, network.request.timeout.ms have clear purposes and we probably want to keep it.replication.timeout.ms is a legacy configuration, we might be able to get it away but let's keep it for nowshould be a standalone configuration.

We also want to achieve the followings:

  • For (A) we want to have a bounded blocking time for send() method.
  • For (B) we need another timeout to expire the batches in accumulator in addition to batching purpose

...

      4. (B) batch.timeout.ms - In some cases, the leader broker information of a batch could be unavailable after the batch is ready to be sent (i.e. batch is full or reached linger.ms). This configuration define the maximum time the producer will wait for the necessary broker information before it fail the batches.

...

  • .

...

      5. (A) buffer.full.block.ms - This timeout defines how long the producer.send() method can block (we will enforce this for both metadata and blocking on buffer full in a separate ticket).

With above, we have the following configuration proposal for end state:

  • (A) max.buffer.full.block.ms ms - To replace block.on.buffer.full. The max time to block when buffer is full.
    • Use case: bound the blocking time of send() with metadata.fetch.timeout.ms.
  • (A/B) metadata.fetch.timeout.ms - reuse metadata timeout as batch.timeout.ms batches in accumulator because it is essentially metadata not available.
    • Use case 1: bound the time to wait for metadata in send().
    • Use case 2: prevent batches without metadata from sitting in the accumulator for too long.
  • (C) replicationnetwork.request.timeout.ms - the max time to wait for response.
    • Use case: to tolerate some slow server but not wait for a response forever when a broker is down.

From user's perspective:

...

  1. Two things are needed to send a message to a topic: metadata of the topic and enough buffer space. send() will block if one of them is not available.
  2. If metadata of a topic is not available for metadata.fetch.timeout.ms, the messages for that topic will be failed.
  3. When a message leaves the producer to broker, the producer will expect response in network.request.timeout.ms. If no response is received, the producer will retry the message.

Proposed Changes

Because the NetworkClient works in an async way. We need to keep track of the send time for each request. So the implementation will the following:

...

The request timeout will be set to a reasonable value, say 60 seconds.

Actions after request timeout

When the request timeout has been reached, we do the following:

...

In most cases, metadata refresh should be able to pick up the new leader if a broker is down. If a broker was not down but just slow, as long as request timeout is set to a reasonable value, we should not see dramatic increase in TCP connections when a broker was not down but just slow.

Plan to deprecate TIMEOUT_CONFIG

To replace remove TIMEOUT_CONFIG with REPLICATION_TIMEOUT_CONFIG, we will do the following;

  1. Add a new ProducerRequest_V1 to remove replication timeout ms.
  2. Modify server side code to take both V0 and V1 of producer request.
  3. In 0.8.2.2 or 0.8.3, we will add REPLICATION_TIMEOUT_CONFIG but leave TIMEOUT_CONFIG there. Producer will still send producer request V0. If user sets TIMEOUT_CONFIG, we show a deprecation warning and use TIMEOUT_CONFIG to override REPLICATION_TIMEOUT_CONFIGadvertise the server side replication timeout configuration.
  4. In 0.9, we will remove TIMEOUT_CONFIG from the producer configuration. Producer will send ProducerRequest V1.

To replace block.on.buffer.full with max.buffer.full.block.ms:

  1. In 0.8.2.2 or 0.8.3, we will add max.buffer.full.block.ms and leave block.on.buffer.full there. If user sets block.on.buffer.full=true, we will set max.buffer.full.block.ms=Long.MAX_VALUE and show a deprecation warning.
  2. In 0.9, we will remove the block.on.buffer.full

Compatibility, Deprecation, and Migration Plan

The changes should be backward compatible.

Rejected Alternatives

Nonebatch.timeout.ms - In some cases, the leader broker information of a batch could be unavailable after the batch is ready to be sent (i.e. batch is full or reached linger.ms). This configuration define the maximum time the producer will wait for the necessary broker information before it fail the batches.