Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In KAFKA-4879, we first noticed that KafkaConsumer will hang forever when deleting after a topic. However, after looking closer, we have found that in general the position(TopicPartition topicPartition) ,commitSync()committed() , and poll() (since pollOnce() calls updateFetchPositions() which blocks through a series of callbacks) method several KafkaConsumer methods will continue to block indefinitely unless the offsets are retrieved for the provided TopicPartition.  To avoid this scenario from occurring:

  1. A secondary complementary method will be added for each method that blocks indefinitely, but with an extra parameter timeout, as well as a variable giving the TimeUnit, which bounds the amount of time spent in the method.
  2. A ClientTimeoutException will be thrown once the amount of time spent exceeds timeout.

Public Interfaces

TimeoutException A ClientTimeoutException will be thrown when the time spent exceeds requestTimeoutMsexceeds timeout:

Code Block
languagejava
titleKafkaConsumer#position(TopicPartition topicPartition))
    /**
     * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
     * This method may issue a remote call to the server if there is no current position for the given partition.
     * <p>
     * This call will block until either the position could be determined or an unrecoverable error is
     * encountered (in which case it is thrown to the caller).
     *
     * @param partition The partition to get the position for
+    * @param timeout   The maximum duration of the method
+    * @param timeunit  The time unit to which timeout refers to
     * @return The current position of the consumer (that is, the offset of the next record to be fetched)
     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
     *             the partition
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
+    * @throws org.apache.kafka.common.errors.TimeoutExceptionClientTimeoutException if time spent blocking for offsets exceed requestTimeoutMs
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
     */
+    public long position(TopicPartition partition, long timeout, TimeUnit unit);

...

Code Block
languagejava
themeEclipse
titleKafkaConsumer#blocking methods
    /**
     * Get the last committed offset for the given partition (whether the commit happened by this process or
     * another). This offset will be used as the position for the consumer in the event of a failure.
     * <p>
     * This call will block to do a remote call to get the latest committed offsets from the server.
     *
     * @param partition The partition to check
+    * @param timeout   The maximum duration of the method
+    * @param timeunit  The unit of time timeout refers to
     * @return The last committed offset and metadata or null if there was no prior commit
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+    * @throws TimeoutExceptionorg.apache.kafka.common.errors.ClientTimeoutException if method exceeds maximum given time
     */
    @Override
    public OffsetAndMetadata committed(TopicPartition partition, final long timeout, final Timeunit timeunit) {
        acquireAndEnsureOpen();
		final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit);
        try {
            Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition), totalWaitTime);
            return offsets.get(partition);
        } finally {
            release();
        }
    }

	/**
     * Commit the specified offsets for the specified list of topics and partitions.
     * <p>
     * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
     * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
     * should not be used. The committed offset should be the next message your application will consume,
     * i.e. lastProcessedMessageOffset + 1.
     * <p>
     * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
     * encountered (in which case it is thrown to the caller).
     * <p>
     * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
     * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
     *
     * @param offsets  A map of offsets by partition with associated metadata
+    * @param timeout  Maximum duration of methof
+    * @param timeunit The unit of time which timeout refers to
     * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
     *             This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
     *             or if there is an active group with the same groupId which is using group management.
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
+.   * @throws org.apache.kafka.common.errors.ClientTimeoutException if the method blocks for more than allocated time
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws java.lang.IllegalArgumentException if the committed offset is negative
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
     *             is too large or if the topic does not exist).
     */
    @Override
    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final long timeout, final Timeunit timeunit) { 
		final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit);
        acquireAndEnsureOpen();
        try {
            coordinator.commitOffsetsSync(new HashMap<>(offsets), totalWaitTime);
        } finally {
            release();
        }
    }

...