Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In KAFKA-4879, we first noticed that KafkaConsumer will hang forever after a topicduring a position() call. However, after looking closer, we have found that several KafkaConsumer methods will continue to block indefinitely unless the offsets are retrieved for the provided TopicPartition.  To avoid this scenario from occurring:

  1. A complementary method will be added for each method that blocks indefinitely, but with an extra Duration parameter timeout, as well as a variable giving the TimeUnit, which bounds the amount of time spent in the method.
  2. A ClientTimeoutException TimeoutException will be thrown once the amount of time spent exceeds timeout.

Public Interfaces

Position

 

Unless otherwise specified, the pre-existing variants for these new interfaces will be marked @Deprecated.

Consumer#Position

A TimeoutException A ClientTimeoutException will be thrown when the time spent exceeds timeout:

Code Block
languagejava
titleKafkaConsumer#position(TopicPartition topicPartition))
    /**
     * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
     * This method may issue a remote call to the server if there is no current position for the given partition.
     * <p>
     * This call will block until either the position could be determined or an unrecoverable error is
     * encountered (in which case it is thrown to the caller).
     *
     * @param partition The partition to get the position for
+    * @param timeout   The maximum duration of the method
+     * @param timeunit  The time unit to which timeout refers to
     * @return The current position of the consumer (that is, the offset of the next record to be fetched)
     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
     *             the partition
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
+    * @throws org.apache.kafka.common.errors.ClientTimeoutExceptionTimeoutException if time spent blocking forexceeds offsetsthe exceed{@code requestTimeoutMstimeout}
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
     */
+    public long position(TopicPartition partition, longDuration timeout, TimeUnit unit);

...

Consumer#committed and

...

Consumer#commitSync

Similarily, this will also be applied to other methods in KafkaConsumer that blocks indefinitely.

Code Block
languagejava
themeEclipse
titleKafkaConsumer#blocking methods
    /**
     * Get the last committed offset for the given partition (whether the commit happened by this process or
     * another). This offset will be used as the position for the consumer in the event of a failure.
     * <p>
     * This call will block to do a remote call to get the latest committed offsets from the server.
     *
     * @param partition The partition to check
+    * @param timeout   The maximum duration of the method
+     * @param timeunit  The unit of time timeout refers to
     * @return The last committed offset and metadata or null if there was no prior commit
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+    * @throws org.apache.kafka.common.errors.ClientTimeoutExceptionTimeoutException if time spent methodblocking exceeds maximumthe given{@code timetimeout}
     */
    @Override
    public OffsetAndMetadata committed(TopicPartition partition, final longDuration timeout, final Timeunit timeunit) {
)'

	/**
     * Commit  acquireAndEnsureOpen();
		final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit);
    the specified offsets for the specified list of topics and partitions.
    try * {<p>
     * This commits offsets to Kafka. The Map<TopicPartition,offsets OffsetAndMetadata>committed offsetsusing = coordinator.fetchCommittedOffsets(Collections.singleton(partition), totalWaitTime);
        this API will be used on the first fetch after every
    return offsets.get(partition);
        } finally {
            release();
        }
    }

	/**
     * Commit the specified offsets for the specified list of topics and partitions * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
     * should not be used. The committed offset should be the next message your application will consume,
     * i.e. lastProcessedMessageOffset + 1.
     * <p>
     * This is a synchronous commits offsetsand towill Kafka.block Theuntil offsetseither committedthe usingcommit thissucceeds APIor willan beunrecoverable usederror onis
 the first fetch after every
* encountered (in which case *it rebalanceis andthrown alsoto onthe startupcaller).
 As such, if you need* to<p>
 store offsets in anything other* thanNote Kafka,that thisasynchronous API
offset commits sent previously with *the should not be used. The committed offset should be the next message your application will consume,
     * i.e. lastProcessedMessageOffset + 1.{@link #commitAsync(OffsetCommitCallback)}
     * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
     * <p>
     * This@param isoffsets a synchronousA commitsmap andof willoffsets blockby untilpartition eitherwith theassociated commitmetadata
+ succeeds or an unrecoverable* error@param is
timeout  Maximum duration to block
     *
 encountered (in which case it* is thrown to@throws org.apache.kafka.clients.consumer.CommitFailedException if the caller).
commit failed and cannot be * <p>retried.
     * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
  This can only *occur (orif similar)you are guaranteedusing toautomatic havegroup theirmanagement callbackswith invoked prior to completion of this method.{@link #subscribe(Collection)},
     *
     * @param offsets  A map of offsets byor partitionif withthere associatedis metadata
+an active group with *the @paramsame timeoutgroupId which Maximumis durationusing ofgroup methofmanagement.
+     * @param timeunit The unit of time which timeout refers to
     * @throws org.@throws org.apache.kafka.clientscommon.consumererrors.CommitFailedExceptionWakeupException if the commit failed and cannot be retried.{@link #wakeup()} is called before or while this
     *             Thisfunction canis onlycalled
 occur if you are using automatic* group management with {@link #subscribe(Collection)},
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *       or if there is an active group with the same groupId which is using group management.
this function is called
+.     * @throws org.apache.kafka.common.errors.WakeupExceptionTimeoutException if {@link #wakeup()} is called before or while thistime spent blocking exceeds the {@code timeout}
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for    function is calledmore details
     * @throws org.apache.kafka.common.errors.InterruptExceptionAuthorizationException if thenot callingauthorized threadto isthe interruptedtopic beforeor orto whilethe
     *             thisconfigured functiongroupId. is called
+.   * @throws org.apache.kafka.common.errors.ClientTimeoutExceptionSee the exception for more details
     * @throws java.lang.IllegalArgumentException if the methodcommitted blocksoffset foris morenegative
 than allocated time
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.AuthenticationExceptiong. if authentication fails. See the exception for more details
 offset metadata
     *         * @throws org.apache.kafka.common.errors.AuthorizationException if notis authorized to the topictoo large or toif the
 topic does   *             configured groupId. See the exception for more detailsnot exist).
     */
    void * @throws java.lang.IllegalArgumentException if the committed offset is negative
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
     *             is too large or if the topic does not exist).
     */
    @Override
    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final long timeout, final Timeunit timeunit) { 
		final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit);
        acquireAndEnsureOpen();
        try {
            coordinator.commitOffsetsSync(new HashMap<>(offsets), totalWaitTime);
        } finally {
            release();
        }
    }

Currently, commitSync does not accept a user-provided timeout, but by default, will block indefinitely by setting wait time to Long.MAX_VALUE. To accomadate for a potential hanging block,

the new KafkaConsumer#commitSync will accept user-specified timeout.  

Poll

Additionally, poll() currently has two use cases: to block on initial assignment metadata (and not poll for records), and to poll for records. We'll split these use cases and truly enforce the timeout in poll at the same time by adding two new methods:

 

commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);

Currently, commitSync does not accept a user-provided timeout, but by default, will block indefinitely by setting wait time to Long.MAX_VALUE. To accomadate for a potential hanging block,

the new KafkaConsumer#commitSync will accept user-specified timeout.  

Consumer#poll

Additionally, poll() currently has two use cases: to block on initial assignment metadata (and not poll for records), and to poll for records. We'll discard the first (unintentional) use case and truly enforce the timeout in poll for both metadata and data.

Note that poll() doesn't throw a TimeoutException because its async semantics are well defined. I.e., it is well defined to return an empty response when there's no data available, and it's designed to be called repeatedly to check for data (hence the name).

 

Code Block
/**
 * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
 * subscribed to any topics or partitions before polling for data.
 * <p>
 * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
 * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
 * offset for the subscribed list of partitions
 *
 *
 * @param timeout The maximum time to block and poll for metadata updates or data.
 *
 * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
 *
 * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
 *             partitions is undefined or out of range and no offset reset policy has been configured
 * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
 *             function is called
 * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
 *             this function is called
Code Block
/**
 * Block until we have an assignment (and fetch offsets, etc.).
 * <p>
 * It is an error to not have subscribed to any topics or partitions before polling for data.
 * <p>
 * Throws a {@link ClientTimeoutException} if the {@code maxBlockTime} expires before the operation completes, but it
 * is safe to try again.
 *
 * @param maxBlockTime The maximum time to block and poll for metadata updates
 *
 * @throws org.apache.kafka.common.errors.ClientTimeoutExceptionAuthenticationException if theauthentication metadatafails. updateSee doesn'tthe completeexception withinfor themore maxBlockTimedetails
 * @throws org.apache.kafka.common.errors.WakeupExceptionAuthorizationException if {@link #wakeup()} is called concurrently with this functioncaller lacks Read access to any of the subscribed
 * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted concurrently with this function
 * topics or to the configured groupId. See the exception for more details
 * @throws org.apache.kafka.clientscommon.consumer.InvalidOffsetExceptionKafkaException iffor theany offsetother forunrecoverable a partition or set oferrors (e.g. invalid groupId or
 *             partitionssession istimeout, undefinederrors ordeserializing out of range and no offset reset policy has been configuredkey/value pairs, or any new error cases in future versions)
 * @throws orgjava.apache.kafka.common.errors.AuthenticationExceptionlang.IllegalArgumentException if authentication fails. See the exceptiontimeout forvalue moreis detailsnegative
 * @throws orgjava.apache.kafka.common.errors.AuthorizationExceptionlang.IllegalStateException if callerthe consumer lacksis Readnot accesssubscribed to any topics or ofmanually theassigned subscribedany
 *             topicspartitions orto toconsume the configured groupId. See the exception for more detailsfrom
 */
public ConsumerRecords<K, V> poll(final Duration timeout)

 

We will mark the existing poll() method as deprecated.

Consumer#partitionsFor

Code Block
/**
 * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
 *             session timeout, or any new error cases in future versions)
 * @throws java.lang.IllegalArgumentException if the timeout value is negative
 * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
 *             partitions to consume from
 */
public void awaitAssignmentMetadata(final Duration maxBlockTime);
 
 
/**
 * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
 * subscribed to any topics or partitions before polling for data.
 * <p>
 * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
 * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
 * offset for the subscribed list of partitions
 *
 *
 * @param maxBlockTime The maximum time to block and poll for metadata updates or data.
 *
 * @return map of topic to records since the last fetch for the subscribed list of topics and partitions
 *Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
 * does not already have any metadata about the given topic.
 *
 * @param topic The topic to get partition metadata for
 * @param timeout The maximum time this operation will block
 *
 * @return The list of partitions
 * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
 * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
 *             function is called
 * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
 *             this function is called
 * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
 * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details
 * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
 *             expiration of the configured request timeout
 * @throws org.apache.kafka.clientscommon.consumer.InvalidOffsetExceptionKafkaException iffor theany offsetother for a partition or set ofunrecoverable errors
 */
List<PartitionInfo> partitionsFor(String topic, Duration timeout);

Consumer#listTopics

Code Block
/**
 * Get metadata about partitions for all topics that the user is authorized partitionsto isview. undefinedThis ormethod outwill ofissue rangea
 and* noremote offsetcall resetto policythe has been configuredserver.
 *
 * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this@param timeout The maximum time this operation will block
 *
 * @return The map of topics and       function is calledits partitions
 * @throws org.apache.kafka.common.errors.InterruptExceptionTimeoutException if thetime callingspent threadblocking isexceeds interruptedthe before or while{@code timeout}
 * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
 *     this        function is called
 * @throws org.apache.kafka.common.errors.AuthenticationExceptionInterruptException if authenticationthe fails.calling Seethread theis exceptioninterrupted forbefore moreor detailswhile
 * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
 *             topics or to the configured groupId. See the exception for more details     this function is called
 * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
 *             expiration of the configured request timeout
 * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
 */
Map<String, List<PartitionInfo>> listTopics(Duration timeout)

 

Consumer#offsetsForTimes

Code Block
/**
 * Look up the offsets for the given partitions sessionby timeout,timestamp. errorsThe deserializingreturned key/value pairs, or any new error cases in future versions)
 * @throws java.lang.IllegalArgumentException if the timeout value is negative
 * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
 *             partitions to consume from
 */
public ConsumerRecords<K, V> poll(final Duration maxBlockTime)

 

We will mark the existing poll() method as deprecated.

 

Proposed Changes

Note that in all cases, new methods are being added. The old methods will behave exactly as today, and will be marked "deprecated since 2.0" to provide a clean migration path.

Regarding the policy of what happens when time limit is exceeded:

1. The new KafkaConsumer#poll(), since it returns offsets, will return an empty ConsumerRecords response.

2. A ClientTimeoutException will be introduced to allow users to more clearly identify the reason why the method timed out. (e.g. LeaderNotAvailable, RequestTimeout etc)

3. A ClientTimeoutException will be thrown for other methods when it times out, citing the cause as a "RequestTimeout".

offset for each partition is the
 * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
 *
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
 * will be returned for that partition.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @param timeout The maximum time this operation will block
 *
 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.
 * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
 * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
 * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
 * @throws IllegalArgumentException if the target timestamp is negative
 * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
 *         expiration of the configured {@code request.timeout.ms}
 * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
 *         the offsets by timestamp
 */
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)

 

Consumer#beginningOffsets

Code Block
/**
 * Get the first offset for the given partitions.
 * <p>
 * This method does not change the current consumer position of the partitions.
 *
 * @see #seekToBeginning(Collection)
 *
 * @param partitions the partitions to get the earliest offsets.
 * @param timeout The maximum time this operation will block
 *
 * @return The earliest available offsets for the given partitions
 * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
 * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
 * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
 * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
 *         expiration of the configured {@code request.timeout.ms}
 */
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout)

 

Consumer#endOffsets

Code Block
/**
 * Get the end offsets for the given partitions. In the default {@code read_uncommitted} isolation level, the end
 * offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For
 * {@code read_committed} consumers, the end offset is the last stable offset (LSO), which is the minimum of
 * the high watermark and the smallest offset of any open transaction. Finally, if the partition has never been
 * written to, the end offset is 0.
 *
 * <p>
 * This method does not change the current consumer position of the partitions.
 *
 * @see #seekToEnd(Collection)
 *
 * @param partitions the partitions to get the end offsets.
 * @param timeout The maximum time this operation will block
 *
 * @return The end offsets for the given partitions.
 * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
 * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
 * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
 * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
 *         expiration of the configured {@code request.timeout.ms}
 */
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout)

 

Consumer#close

Notes:

  • close() already is a variant with no parameters and applies a default from config. This variant will NOT be deprecated
  • close(long, TimeUnit) also exists as a variant. This one WILL be deprecated in favor of the new close(Duration) variant for consistency
  • The existing semantics of close is not to throw a TimeoutException. Instead, after waiting for the timeout, it forces itself closed.

 

Code Block
/**
 * Tries to close the consumer cleanly within the specified timeout. This method waits up to
 * {@code timeout} for the consumer to complete pending commits and leave the group.
 * If auto-commit is enabled, this will commit the current offsets if possible within the
 * timeout. If the consumer is unable to complete offset commits and gracefully leave the group
 * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
 * used to interrupt close.
 *
 * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
 *                non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
 *
 * @throws IllegalArgumentException If the {@code timeout} is negative.
 * @throws InterruptException If the thread is interrupted before or while this function is called
 * @throws org.apache.kafka.common.KafkaException for any other error during close
 */
public void close(Duration timeout)

 Note: In the current version, fetchCommittedOffsets() will block forever if the committed offsets cannot be fetched successfully and affect position() and committed(). We need to break out of its internal while loop.

Compatibility, Deprecation, and Migration Plan

Since old methods will not be modified, preexisting data frameworks will not be affected. However, these methods might will be deprecated in favor of methods which are bound by a specific time limit.

...