You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 16 Next »

Status

Current state: Discuss

Discussion thread: here

JIRA: KAFKA-2120

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In old producer/consumer, we have a socket timeout associated with each request, the producer/consumer will send a request and then wait for response. If no response is received from a broker within specified timeout, the request will fail.

In the NetworkClient of new producer/consumer, currently we don't have a similar timeout for requests. Adding a client side request timeout in NetworkClient would be useful for the following reasons:

1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout.

2. Currently the producer sends requests in a non-blocking way and does not wait for response. If a socket is disconnected the network client will fail the requests with a NetworkException. However, It might take very long for TCP connection to timeout. Adding a client side request timeout will help solve such kind of issues.

This KIP is only trying to solve the problem after a request has been constructed. For new producer specifically, there are situations where record batches can sit in accumulator for very long before they are drained out of RecordAccumulator (KAKFA-1788). Since the handling for those situations are more state-based instead of time-based, we are not trying to address them together in this KIP.

Public Interfaces

Add the following new configuration to org.apache.kafka.clients.producer.ProducerConfig and org.apache.kafka.clients.consumer.ConsumerConfig:

public static final String NETWORK_REQUEST_TIMEOUT_CONFIG = "network.request.timeout.ms";

private static final String NETWORK_REQUEST_TIMEOUT_DOC = "The configuration controls the maximum amount of time the producer will wait for the response of a request. If the "

                                              + "response is not received before the timeout elapses the producer will resend the request if necessary or fail the request if retries are exhausted."

If a request is timeout and retries have not been exhausted, the request will be retried.

When a request is timeout and retries are exhausted, an org.apache.kafka.common.errors.TimeoutException will be put in the returned future of a request and callback will be fired.

There is an existing configuration TIMEOUT_CONFIG, which is really a server side timeout specifying the maximum amount of time brokers can wait before send back the response. In order to avoid future confusion, for KafkaProducer, we might also want to change TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG.

Discussion on end state of timeouts in producer

We want to see if we can reduce the configurations exposed to users so we have less confusion meanwhile still have the flexibility to support different use cases.

The timeout might be needed in the following three scenarios. So we will have at least three timeout configurations.

A. when user calls producer.send()

B. batches in accumulator

C. after request is sent

Currently we might have the following timeout in producer configurations with scenario in the parenthesis.

  1. (A) metadata.fetch.timeout.ms - The maximum time producer will wait for metadata to be ready in producer.send()
  2. (C) replication.timeout.ms - This is a server side configuration. It defines how long a server will wait for the records to be replicated to followers.
  3. (C) network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response.

From what I can see, network.request.timeout.ms have clear purposes and we probably want to keep it.

replication.timeout.ms is a legacy configuration, we might be able to get it away but let's keep it for now.

For (B) we need another timeout to expire the batches in accumulator in addition to batching purpose. That means we probably need the following configuration.

      5. (B) batch.timeout.ms - In some cases, the leader broker information of a batch could be unavailable after the batch is ready to be sent (i.e. batch is full or reached linger.ms). This configuration define the maximum time the producer will wait for the necessary broker information before it fail the batches.

For (A) it makes sense to have a dedicated timeout for send() method to provide blocking bound. So it gives another configuration.

      6. (A) max.send.block.ms - This timeout defines how long the producer.send() method can block (we will enforce this for both metadata and blocking on buffer full in a separate ticket).

With above, we have the following proposals and want to see which one makes more sense:

Proposal 1:

  • (A/B) metadata.fetch.timeout.ms - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available.
  • (C) replication.timeout.ms
  • (C) network.request.timeout.ms

Proposal 2:

  1. (A) max.send.block.ms
  2. (B) batch.timeout.ms
  3. (C) replication.timeout.ms
  4. (C) network.request.timeout.ms

Proposal 1 has four configurations but does got guarantee of blocking time for send() when blocking on buffer full is turned on. So user needs to catch the exception in outside send and retry if buffer is full.

Proposal 2 has five configurations but guarantee the blocking time of send().

Proposed Changes

Because the NetworkClient works in an async way. We need to keep track of the send time for each request. So the implementation will the following:

  1. associate a time stamp with each inflight requests and retry
  2. The poll() will go through the inflight requests and expire those requests if necessary.
  3. The timeout for poll() will need to consider the request timeout for inflight requests.

The request timeout will be set to a reasonable value, say 60 seconds.

Actions after request timeout

When the request timeout has been reached, we do the following:

  1. disconnect TCP connection.
  2. Refresh metadata
  3. Retry the request on the new TCP connection

In most cases, metadata refresh should be able to pick up the new leader if a broker is down. If a broker was not down but just slow, as long as request timeout is set to a reasonable value, we should not see dramatic increase in TCP connections when a broker was not down but just slow.

Plan to deprecate TIMEOUT_CONFIG

To replace TIMEOUT_CONFIG with REPLICATION_TIMEOUT_CONFIG, we will do the following;

  1. In 0.8.2.2 or 0.8.3, we will add REPLICATION_TIMEOUT_CONFIG but leave TIMEOUT_CONFIG there. If user sets TIMEOUT_CONFIG, we show a deprecation warning and use TIMEOUT_CONFIG to override REPLICATION_TIMEOUT_CONFIG.
  2. In 0.9, we will remove TIMEOUT_CONFIG from the configuration.

Compatibility, Deprecation, and Migration Plan

The changes should be backward compatible.

Rejected Alternatives

None

  • No labels