Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Linked KIP with JIRA macro

Table of Contents

Status

Current state: AcceptedUnder Discussion

Discussion thread: here

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6608
here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KAFKA-4879, we first noticed that KafkaConsumer will hang forever during a position() call. However, after looking closer, we have found that several KafkaConsumer methods will continue to block indefinitely unless the offsets are retrieved for the provided TopicPartition.  To avoid this scenario from occurring:

  1. A complementary method will be added for each method that blocks indefinitely, but with an extra Duration parameter timeout, which bounds the amount of time spent in the method.
  2. A TimeoutException will be thrown once the amount of time spent exceeds timeout.

Public Interfaces

 

Unless otherwise specified, the pre-existing variants for these new interfaces will be marked @Deprecated.

Consumer#Position

A TimeoutException will be thrown when the time spent exceeds timeout:

KafkaConsumer has a long history of indefinite blocking behavior which has been a continual cause of user frustration. This KIP aims to fix this problem in two ways:

  1. We introduce a new configuration default.api.timeout.ms to control the maximum time that current blocking APIs will await before raising a timeout error.
  2. We add overloaded APIs to allow for custom timeout control. This is intended for advanced usage such as in Kafka Streams

Some care must be taken in the case of the poll() since many applications depend on the current behavior of blocking until an assignment is found. This KIP retains this behavior for poll(long), but introduces a new API poll(Duration), which always returns when the timeout expires. We will deprecate poll(long) and remove it in a later major release.


Public Interfaces

This KIP adds default.api.timeout.ms as a new configuration for the consumer which controls the default timeout for methods which do not accept a timeout as an argument. The default value of default.api.timeout.ms will be one minute.

Below we document the APIs that this timeout will impact and how the behavior changes.

The following APIs currently block indefinitely until either the operation completes successfully or an unrecoverable error is encountered. Following this KIP, these methods will now raise org.apache.kafka.common.errors.TimeoutException if neither of these conditions have been reached prior to expiration of the time specified by default.api.timeout.ms.


Code Block
void commitSync();


void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);


long position(TopicPartition partition);


OffsetAndMetadata committed(TopicPartition partition);


The following APIs currently block for at most the time configured by request.timeout.ms until the operation completed successfully or an unrecoverable error is encountered. Following this KIP, they will instead use the timeout indicated by default.api.timeout.ms. As before, when the timeout is reached, org.apache.kafka.common.errors.TimeoutException will be raised to the user.

Code Block
List<PartitionInfo> partitionsFor(String topic);

Map<String, List<PartitionInfo>> listTopics();

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);

Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);

Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);


The current default timeout for the consumer is just over five minutes. It is intentionally set to a value higher than max.poll.interval.ms, which controls how long the rebalance can take and how long a JoinGroup request will be held in purgatory on the broker. In this KIP, we propose to change the default value of request.timeout.ms to 30 seconds. The JoinGroup API will be treated as a special case and its timeout will be set to a value derived from max.poll.interval.ms. All other request types will use the timeout configured by request.timeout.ms.

As mentioned above, this KIP also adds overloaded APIs to allow for custom timeout control. The new APIs are documented below: 

Consumer#position

A TimeoutException will be thrown when the time spent exceeds timeout:

Code Block
languagejava
titleKafkaConsumer#position(TopicPartition topicPartition))
    /**
Code Block
languagejava
titleKafkaConsumer#position(TopicPartition topicPartition))
    /**
     * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
     * This method may issue a remote call to the server if there is no current position for the given partition.
     * <p>
     * This call will block until either the position could be determined or an unrecoverable error is
     * encountered (in which case it is thrown to the caller).
     *
     * @param partition The partition to get the position for
+    * @param timeout   The maximum duration of the method
     *
     * @return The current position of the consumer (that is, the offset of the next record to be fetched)
     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
     *             the partition
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     * Get the offset of the <i>next record</i> that will be fetched  function is called(if a record with that offset exists).
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while This method may issue a remote call to the server if there is no current position for the given partition.
     * <p>
     * This call will block until either thisthe functionposition iscould called
+be determined or an * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}unrecoverable error is
     * encountered (in which case it is thrown to the caller).
     *
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See@param partition The partition to get the exceptionposition for
+ more details
  * @param timeout * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the The maximum duration of the method
     *
     * @return The current position of the consumer (that is, configuredthe groupId.offset Seeof the exceptionnext record forto morebe detailsfetched)
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
     */
+   long position(TopicPartition partition, Duration timeout);

Consumer#committed and Consumer#commitSync

Similarily, this will also be applied to other methods in KafkaConsumer that blocks indefinitely.

Code Block
languagejava
themeEclipse
titleKafkaConsumer#blocking methods
    /**IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
     * Get the last committed offset for the given partition (whether the commit happened by this process orthe partition
     * another). This offset will be used as the position for the consumer in the event of a failure.@throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     * <p>
     * This call will block to do afunction remoteis callcalled
 to get the latest committed* offsets from the server.
     *@throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     * @param partition The partition to check
+    * @param timeout   Thethis maximumfunction duration of the method
is called
+     *
 @throws org.apache.kafka.common.errors.TimeoutException if time *spent @returnblocking Theexceeds lastthe committed offset and metadata or null if there was no prior commit{@code timeout}
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.WakeupExceptionAuthorizationException if {@link #wakeup()} is called beforenot authorized to the topic or whileto thisthe
     *             configured groupId. See function is calledthe exception for more details
     * @throws org.apache.kafka.common.errors.InterruptExceptionKafkaException iffor theany callingother threadunrecoverable iserrors
 interrupted before or  while*/
+   long position(TopicPartition partition, Duration timeout);

Consumer#committed and Consumer#commitSync

Similarily, this will also be applied to other methods in KafkaConsumer that blocks indefinitely.

Code Block
languagejava
themeEclipse
titleKafkaConsumer#blocking methods
 *    /**
     * Get the last thiscommitted offset functionfor isthe called
given partition (whether the commit *happened @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to theby this process or
     * another). This offset will be used as the position for the consumer in the event of a failure.
     * <p>
     * This call will block to do configured groupId. Seea remote call to get the exception for more detailslatest committed offsets from the server.
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
     * @param partition The partition to check
+    * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}@param timeout   The maximum duration of the method
     */
     OffsetAndMetadata* @return The last committed(TopicPartition partition,offset finaland Duration timeout)'

	/**
     * Commit the specified offsets for the specified list of topics and partitions.
     * <p>metadata or null if there was no prior commit
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     * This commits offsets to Kafka. The offsets committed using this API will befunction usedis oncalled
 the first fetch after every
*     * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API@throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     * should not be used. The committed offset should be the next message yourthis applicationfunction willis consume,called
     * i@throws org.e. lastProcessedMessageOffset + 1.
     * <p>apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
@throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *       * encountered (in which case it isconfigured throwngroupId. toSee the caller).exception for more details
     * <p>
 @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+    * Note that asynchronous offset commits sent previously with@throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@link #commitAsync(OffsetCommitCallback)@code timeout}
     */
 (or similar) are guaranteedOffsetAndMetadata to have their callbacks invoked prior to completion of this methodcommitted(TopicPartition partition, final Duration timeout)'

	/**
     * Commit the specified offsets for the specified list of topics and partitions.
     * <p>
     * @paramThis commits offsets to AKafka. mapThe of offsets bycommitted partitionusing withthis associatedAPI metadata
+will be used on *the @paramfirst timeoutfetch after Maximumevery
 duration to block
  * rebalance and *
also on startup. As such, *if @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.you need to store offsets in anything other than Kafka, this API
     * should not be used. The committed offset should be the next message Thisyour canapplication onlywill occurconsume,
 if you are using automatic group management with {@link #subscribe(Collection)},* i.e. lastProcessedMessageOffset + 1.
     * <p>
     * This is a synchronous commits and orwill ifblock thereuntil iseither anthe activecommit groupsucceeds withor thean sameunrecoverable groupIderror which is using group management.
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()}encountered (in which case it is calledthrown beforeto or while thisthe caller).
     * <p>
     * Note that asynchronous offset commits sent functionpreviously is calledwith the {@link #commitAsync(OffsetCommitCallback)}
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
     *
     * @param offsets  A map of offsets by thispartition functionwith isassociated calledmetadata
+.    * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}@param timeout  Maximum duration to block
     *
      * @throws org.apache.kafka.commonclients.errorsconsumer.AuthenticationExceptionCommitFailedException if authenticationthe fails.commit Seefailed theand exceptioncannot for more detailsbe retried.
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
  This can only *occur if you are using automatic group management with     configured groupId. See the exception for more details
{@link #subscribe(Collection)},
     *           * @throws java.lang.IllegalArgumentExceptionor if there is an active group with the committedsame groupId offsetwhich is negative using group management.
     * @throws org.apache.kafka.common.KafkaExceptionerrors.WakeupException forif any other unrecoverable errors (e.g. if offset metadata{@link #wakeup()} is called before or while this
     *             function is too large orcalled
     * @throws org.apache.kafka.common.errors.InterruptException if the topic does not exist).
     */
    void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);

Currently, commitSync does not accept a user-provided timeout, but by default, will block indefinitely by setting wait time to Long.MAX_VALUE. To accomadate for a potential hanging block,

the new KafkaConsumer#commitSync will accept user-specified timeout.  

Consumer#poll

 calling thread is interrupted before or while
     *             this function is called
+.   * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws java.lang.IllegalArgumentException if the committed offset is negative
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
     *             is too large or if the topic does not exist).
     */
    void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);


    /**
     * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
     * partitions.
     * <p>
     * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
     * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
     * should not be used.
     * <p>
     * This is a synchronous commits and will block until either the commit succeeds, an unrecoverable error is
     * encountered (in which case it is thrown to the caller), or the passed timeout expires.
     * <p>
     * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
     * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
     * 
+    * @param timeout  Maximum duration to block
     *
     * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
     *             This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
     *             or if there is an active group with the same groupId which is using group management.
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
     *             is too large or if the topic does not exist).
+    * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
     *            of the offset commit
     */
    @Override
    public void commitSync(Duration timeout);

Currently, commitSync does not accept a user-provided timeout, but by default, will block indefinitely by setting wait time to Long.MAX_VALUE. To accomadate for a potential hanging block,

the new KafkaConsumer#commitSync will accept user-specified timeout.  

Consumer#poll

The pre-existing variant poll(long timeout) would block indefinitely for metadata updates if they were needed, then it would issue a fetch and poll for timeout ms for new records. The initial indefinite metadata block caused applications to become stuck when the brokers became unavailable. The existence of the timeout parameter made the indefinite block especially unintuitive.

We will add a new method poll(Duration timeout) with the semantics:

  1. iff a metadata update is needed:
    1. send (asynchronous) metadata requests
    2. poll for metadata responses (counts against timeout)
      • if no response within timeout, return an empty collection immediately
  2. if there is fetch data available, return it immediately
  3. if there is no fetch request in flight, send fetch requests
  4. poll for fetch responses (counts against timeout)
    • if no response within timeout, return an empty collection (leaving async fetch request for the next poll)
    • if we get a response, return the response

We will deprecate the original method, poll(long timeout), and we will not change its semantics, so it remains:

  1. iff a metadata update is needed:
    1. send (asynchronous) metadata requests
    2. poll for metadata responses indefinitely until we get it
  2. if there is fetch data available, return it immediately
  3. if there is no fetch request in flight, send fetch requests
  4. poll for fetch responses (counts against timeout)
    • if no response within timeout, return an empty collection (leaving async fetch request for the next poll)
    • if we get a response, return the response

One notable usage is prohibited by the new poll: previously, you could call poll(0) to block for metadata updates, for example to initialize the client, supposedly without fetching records. Note, though, that this behavior is not according to any contract, and there is no guarantee that poll(0) won't return records the first time it's called. Therefore, it has always been unsafe to ignore the responseAdditionally, poll() currently has two use cases: to block on initial assignment metadata (and not poll for records), and to poll for records. We'll discard the first (unintentional) use case and truly enforce the timeout in poll for both metadata and data.


Note that poll() doesn't throw a TimeoutException because its async semantics are well defined. I.e., it is well defined to return an empty response when there's no data available, and it's designed to be called repeatedly to check for data (hence the name).

...

We will mark the existing poll() method as deprecated.

Consumer#partitionsFor

Code Block
/**
 * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
 * does not already have any metadata about the given topic.
 *
 * @param topic The topic to get partition metadata for
 * @param timeout The maximum time this operation will block
 *
 * @return The list of partitions
 * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
 * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
 *             function is called
 * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
 *             this function is called
 * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
 * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details
 * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
 *             expiration of the configured request timeout
 * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
 */
List<PartitionInfo> partitionsFor(String topic, Duration timeout);

Consumer#listTopics

Code Block
/**
 * Get metadata about partitions for all topics that the user is authorized to view. This method will issue a
 * remote call to the server.
 *
 * @param timeout The maximum time this operation will block
 *
 * @return The map of topics and its partitions
 * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
 * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
 *             function is called
 * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
 *             this function is called
 * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
 *             expiration of the configured request timeout
 * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
 */
Map<String, List<PartitionInfo>> listTopics(Duration timeout)

 

Consumer#offsetsForTimes

Code Block
/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
 * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
 *
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
 * will be returned for that partition.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @param timeout The maximum time this operation will block
 *
 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.
 * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
 * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
 * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
 * @throws IllegalArgumentException if the target timestamp is negative
 * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
 *         expiration of the configured {@code request.timeout.ms}
 * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
 *         the offsets by timestamp
 */
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)

 

Consumer#beginningOffsets

Code Block
/**
 * Get the first offset for the given partitions.
 * <p>
 * This method does not change the current consumer position of the partitions.
 *
 * @see #seekToBeginning(Collection)
 *
 * @param partitions the partitions to get the earliest offsets.
 * @param timeout The maximum time this operation will block
 *
 * @return The earliest available offsets for the given partitions
 * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
 * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
 * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
 * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
 *         expiration of the configured {@code request.timeout.ms}
 */
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)

 

Consumer#endOffsets

Code Block
/**
 * Get the end offsets for the given partitions. In the default {@code read_uncommitted} isolation level, the end
 * offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For
 * {@code read_committed} consumers, the end offset is the last stable offset (LSO), which is the minimum of
 * the high watermark and the smallest offset of any open transaction. Finally, if the partition has never been
 * written to, the end offset is 0.
 *
 * <p>
 * This method does not change the current consumer position of the partitions.
 *
 * @see #seekToEnd(Collection)
 *
 * @param partitions the partitions to get the end offsets.
 * @param timeout The maximum time this operation will block
 *
 * @return The end offsets for the given partitions.
 * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
 * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
 * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
 * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
 *         expiration of the configured {@code request.timeout.ms}
 */
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout)

 

Consumer#close

Notes:

  • close() already is a variant with no parameters and applies a default from config. This variant will NOT be deprecatedHowever, this variant will not be deprecated because it called by the Closeable interface.
  • close(long, TimeUnit) also exists as a variant. This one WILL will be deprecated in favor of the new close(Duration) variant for consistency
  • The existing semantics of close is not to throw a TimeoutException. Instead, after waiting for the timeout, it forces itself closed.

...

Code Block
/**
 * Tries to close the consumer cleanly within the specified timeout. This method waits up to
 * {@code timeout} for the consumer to complete pending commits and leave the group.
 * If auto-commit is enabled, this will commit the current offsets if possible within the
 * timeout. If the consumer is unable to complete offset commits and gracefully leave the group
 * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
 * used to interrupt close.
 *
 * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
 *                non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
 *
 * @throws IllegalArgumentException If the {@code timeout} is negative.
 * @throws InterruptException If the thread is interrupted before or while this function is called
 * @throws org.apache.kafka.common.KafkaException for any other error during close
 */
public void close(Duration timeout)

 

Compatibility, Deprecation, and Migration Plan

Since old methods will not be modified, preexisting data frameworks will not be affected. However, some of these methods will be deprecated in favor of methods which are bound by a specific time limit.

The introduction of default.api.timeout.ms causes a slight change of behavior since some of the blocking APIs will now raise TimeoutException rather than their current blocking behavior. The change is compatible with the current API since TimeoutException is a KafkaException. Additionally, since TimeoutException is retriable, any existing retry logic will work as expected. 

Feasible and Rejected Alternatives

Please see KIP-288 for other rejected alternatives.

...