Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update KIP for default.api.timeout.ms

...

  1. We introduce a new configuration maxdefault.api.blocktimeout.ms to control the maximum time that current blocking APIs will await before raising a timeout error.
  2. We add overloaded APIs to allow for custom timeout control. This is intended for advanced usage such as in Kafka Streams

...

Public Interfaces

This KIP adds maxdefault.api.blocktimeout.ms as a new configuration for the consumer which controls the default timeout for methods which do not accept a timeout as an argument. The default value of maxdefault.api.blocktimeout.ms will be one minute, which matches the default for the same config in the Producer. 

Below we document the APIs that this timeout will impact and how the behavior changes.

The following APIs currently block indefinitely until either the operation completes successfully or an unrecoverable error is encountered. Following this KIP, these methods will now raise org.apache.kafka.common.errors.TimeoutException if neither of these conditions have been reached prior to expiration of the time specified by maxdefault.api.blocktimeout.ms.


Code Block
void commitSync();


void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);


long position(TopicPartition partition);


OffsetAndMetadata committed(TopicPartition partition);

...

The following APIs currently block for at most the time configured by request.timeout.ms until the operation completed successfully or an unrecoverable error is encountered. Following this KIP, they will instead use the timeout indicated by maxdefault.api.blocktimeout.ms. As before, when the timeout is reached, org.apache.kafka.common.errors.TimeoutException will be raised to the user.

Code Block
List<PartitionInfo> partitionsFor(String topic);

Map<String, List<PartitionInfo>> listTopics();

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);

Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);

Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);

.common.errors.TimeoutException will be raised to the user.

Code Block
List<PartitionInfo> partitionsFor(String topic);

Map<String, List<PartitionInfo>> listTopics();

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);

Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);

Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);


The current default timeout for the consumer is just over five minutes. It is intentionally set to a value higher than max.poll.interval.ms, which controls how long the rebalance can take and how long a JoinGroup request will be held in purgatory on the broker. In this KIP, we propose to change the default value of request.timeout.ms to 30 seconds. The JoinGroup API will be treated as a special case and its timeout will be set to a value derived from max.poll.interval.ms. All other request types will use the timeout configured by request.timeout.ms.

As mentioned above, this KIP also adds overloaded APIs to allow for custom timeout control. The new APIs are documented below: 

...

Since old methods will not be modified, preexisting data frameworks will not be affected. However, some of these methods will be deprecated in favor of methods which are bound by a specific time limit.

The introduction of maxdefault.api.blocktimeout.ms causes a slight change of behavior since some of the blocking APIs will now raise TimeoutException rather than their current blocking behavior. The change is compatible with the current API since TimeoutException is a KafkaException. Additionally, since TimeoutException is retriable, any existing retry logic will work as expected. 

...