Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. A complementary method will be added for each method that blocks indefinitely, but with an extra parameter timeout, as well as a variable giving the TimeUnit, which bounds the amount of time spent in the method.
  2. A ClientTimeoutException will be thrown once the amount of time spent exceeds timeout.

Public Interfaces

Position

A ClientTimeoutException will be thrown when the time spent exceeds timeout:

Code Block
languagejava
titleKafkaConsumer#position(TopicPartition topicPartition))
    /**
     * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
     * This method may issue a remote call to the server if there is no current position for the given partition.
     * <p>
     * This call will block until either the position could be determined or an unrecoverable error is
     * encountered (in which case it is thrown to the caller).
     *
     * @param partition The partition to get the position for
+    * @param timeout   The maximum duration of the method
+    * @param timeunit  The time unit to which timeout refers to
     * @return The current position of the consumer (that is, the offset of the next record to be fetched)
     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
     *             the partition
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
+    * @throws org.apache.kafka.common.errors.ClientTimeoutException if time spent blocking for offsets exceed requestTimeoutMs
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
     */
+    public long position(TopicPartition partition, long timeout, TimeUnit unit);

Committed and CommitSync

Similarily, this will also be applied to other methods in KafkaConsumer that blocks indefinitely.

...

the new KafkaConsumer#commitSync will accept user-specified timeout.  

Proposed Changes

Poll

Additionally, poll() currently has two use cases: to block on initial assignment metadata (and not poll for records), and to poll for records. We'll split these use cases and truly enforce the timeout in poll at the same time by adding two new methods:

 

Code Block
/**
 * Block until we have an assignment (and fetch offsets, etc.).
 * <p>
 * It is an error to not have subscribed to any topics or partitions before polling for data.
 * <p>
 * Throws a {@link TimeoutException} if the {@code maxBlockTime} expires before the operation completes, but it
 * is safe to try again.
 *
 * @param maxBlockTime The maximum time to block and poll for metadata updates
 *
 * @throws org.apache.kafka.common.errors.TimeoutException if the metadata update doesn't complete within the maxBlockTime
 * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called concurrently with this function
 * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted concurrently with this function
 * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
 *             partitions is undefined or out of range and no offset reset policy has been configured
 * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
 * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
 *             topics or to the configured groupId. See the exception for more details
 * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
 *             session timeout, or any new error cases in future versions)
 * @throws java.lang.IllegalArgumentException if the timeout value is negative
 * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
 *             partitions to consume from
 */
public void awaitAssignmentMetadata(final Duration maxBlockTime);
 
 
/**
 * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
 * subscribed to any topics or partitions before polling for data.
 * <p>
 * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
 * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
 * offset for the subscribed list of partitions
 *
 *
 * @param maxBlockTime The maximum time to block and poll for metadata updates or data.
 *
 * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
 *
 * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
 *             partitions is undefined or out of range and no offset reset policy has been configured
 * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
 *             function is called
 * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
 *             this function is called
 * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
 * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
 *             topics or to the configured groupId. See the exception for more details
 * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
 *             session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
 * @throws java.lang.IllegalArgumentException if the timeout value is negative
 * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
 *             partitions to consume from
 */
public ConsumerRecords<K, V> poll(final Duration maxBlockTime)

 

We will mark the existing poll() method as deprecated.

 

Proposed Changes

Note that in all cases, new methods are being added. The old methods will behave exactly as today, and will be marked "deprecated since 2.0" to provide a clean migration pathOne change shall be made to KafkaConsumer#poll() whose calls in Kafka internals  (specifically fetchCommittedOffsets() ) will block indefinitely. This is the exact same problem as what other methods face (such as committed() and position()). To avoid blocking indefinitely, the new methods will take user-determined timeout to define the maximum amount of time for which a method blocks.

Regarding the policy of what happens when time limit is exceeded:

1. The new KafkaConsumer#poll(), since it returns offsets, will return nothing (or what KIP-288 suggests is to throw a TimeoutException while metadata is updating)an empty ConsumerRecords response.

2. A ClientTimeoutException will be introduced to allow users to more clearly identify the reason why the method timed out. (e.g. LeaderNotAvailable, RequestTimeout etc)

...

Note: In the current version, fetchCommittedOffsets() will block forever if the committed offsets cannot be fetched successfully and affect position() and committed(). We need to break out of its internal while loop.

...