You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KAFKA-4879, we first noticed that KafkaConsumer will hang forever when deleting a topic. However, after looking closer, we have found that in general the position(TopicPartition topicPartition) method will continue to block indefinitely unless the offsets are retrieved for the provided TopicPartition.  To avoid this scenario from occurring:

  1. A secondary method will be added, but with an extra parameter timeout which bounds the amount of time spent.
  2. A TimeoutException will be thrown once the amount of time spent exceeds timeout.

Public Interfaces

TimeoutException will be thrown when the time spent exceeds requestTimeoutMs:

KafkaConsumer#position(TopicPartition topicPartition))
    /**
     * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
     * This method may issue a remote call to the server if there is no current position for the given partition.
     * <p>
     * This call will block until either the position could be determined or an unrecoverable error is
     * encountered (in which case it is thrown to the caller).
     *
     * @param partition The partition to get the position for
     * @return The current position of the consumer (that is, the offset of the next record to be fetched)
     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
     *             the partition
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
+    * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking for offsets exceed requestTimeoutMs
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
     */
+    public long position(TopicPartition partition, long timeout);

 

Proposed Changes

A TimeoutException will be thrown if time spent of position() method exceeds requestTimeoutMs.

Compatibility, Deprecation, and Migration Plan

Due to earlier versions of position() not throwing TimeoutException, some changes will have to be made to existing data structures to take into account what happens if offsets are not retrieved.

Rejected Alternatives

One alternative was to add a timeout parameter to position(). However, the changes made by the user will be much more extensive then basing the time constraint on  requestTimeoutMs because the method signature has been changed. 

 

Another possibility was the usage of requestTimeoutMs to bound position(), however, this would make the method highly inflexible, especially since requestTimeoutMs is already being used by multiple other methods.

  • No labels