You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 16 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KAFKA-4879, we first noticed that KafkaConsumer will hang forever after a topic. However, after looking closer, we have found that several KafkaConsumer methods will continue to block indefinitely unless the offsets are retrieved for the provided TopicPartition.  To avoid this scenario from occurring:

  1. A complementary method will be added for each method that blocks indefinitely, but with an extra parameter timeout, as well as a variable giving the TimeUnit, which bounds the amount of time spent in the method.
  2. A ClientTimeoutException will be thrown once the amount of time spent exceeds timeout.

Public Interfaces

A ClientTimeoutException will be thrown when the time spent exceeds timeout:

KafkaConsumer#position(TopicPartition topicPartition))
    /**
     * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
     * This method may issue a remote call to the server if there is no current position for the given partition.
     * <p>
     * This call will block until either the position could be determined or an unrecoverable error is
     * encountered (in which case it is thrown to the caller).
     *
     * @param partition The partition to get the position for
+    * @param timeout   The maximum duration of the method
+    * @param timeunit  The time unit to which timeout refers to
     * @return The current position of the consumer (that is, the offset of the next record to be fetched)
     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
     *             the partition
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
+    * @throws org.apache.kafka.common.errors.ClientTimeoutException if time spent blocking for offsets exceed requestTimeoutMs
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
     */
+    public long position(TopicPartition partition, long timeout, TimeUnit unit);

Similarily, this will also be applied to other methods in KafkaConsumer that blocks indefinitely.

KafkaConsumer#blocking methods
    /**
     * Get the last committed offset for the given partition (whether the commit happened by this process or
     * another). This offset will be used as the position for the consumer in the event of a failure.
     * <p>
     * This call will block to do a remote call to get the latest committed offsets from the server.
     *
     * @param partition The partition to check
+    * @param timeout   The maximum duration of the method
+    * @param timeunit  The unit of time timeout refers to
     * @return The last committed offset and metadata or null if there was no prior commit
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+    * @throws org.apache.kafka.common.errors.ClientTimeoutException if method exceeds maximum given time
     */
    @Override
    public OffsetAndMetadata committed(TopicPartition partition, final long timeout, final Timeunit timeunit) {
        acquireAndEnsureOpen();
		final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit);
        try {
            Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition), totalWaitTime);
            return offsets.get(partition);
        } finally {
            release();
        }
    }

	/**
     * Commit the specified offsets for the specified list of topics and partitions.
     * <p>
     * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
     * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
     * should not be used. The committed offset should be the next message your application will consume,
     * i.e. lastProcessedMessageOffset + 1.
     * <p>
     * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
     * encountered (in which case it is thrown to the caller).
     * <p>
     * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
     * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
     *
     * @param offsets  A map of offsets by partition with associated metadata
+    * @param timeout  Maximum duration of methof
+    * @param timeunit The unit of time which timeout refers to
     * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
     *             This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
     *             or if there is an active group with the same groupId which is using group management.
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
+.   * @throws org.apache.kafka.common.errors.ClientTimeoutException if the method blocks for more than allocated time
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
     *             configured groupId. See the exception for more details
     * @throws java.lang.IllegalArgumentException if the committed offset is negative
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
     *             is too large or if the topic does not exist).
     */
    @Override
    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final long timeout, final Timeunit timeunit) { 
		final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit);
        acquireAndEnsureOpen();
        try {
            coordinator.commitOffsetsSync(new HashMap<>(offsets), totalWaitTime);
        } finally {
            release();
        }
    }

Currently, commitSync does not accept a user-provided timeout, but by default, will block indefinitely by setting wait time to Long.MAX_VALUE. To accomadate for a potential hanging block,

the new KafkaConsumer#commitSync will accept user-specified timeout.  

Proposed Changes

One change shall be made to KafkaConsumer#poll() whose calls in Kafka internals  (specifically fetchCommittedOffsets() ) will block indefinitely. This is the exact same problem as what other methods face (such as committed() and position()). To avoid blocking indefinitely, the new methods will take user-determined timeout to define the maximum amount of time for which a method blocks.

Regarding the policy of what happens when time limit is exceeded:

1. KafkaConsumer#poll(), since it returns offsets, will return nothing (or what KIP-288 suggests is to throw a TimeoutException while metadata is updating).

2. A ClientTimeoutException will be introduced to allow users to more clearly identify the reason why the method timed out. (e.g. LeaderNotAvailable, RequestTimeout etc)

3. A ClientTimeoutException will be thrown for other methods when it times out, citing the cause as a "RequestTimeout".

Note: In the current version, fetchCommittedOffsets() will block forever if the committed offsets cannot be fetched successfully and affect position() and

committed(). We need to break out of its internal while loop.

Compatibility, Deprecation, and Migration Plan

Since old methods will not be modified, preexisting data frameworks will not be affected. However, these methods might be deprecated in favor of methods which are bound by a specific time limit.

Feasible and Rejected Alternatives

Please see KIP-288 for other rejected alternatives.

In discussion, many have raised the idea of using a new config to set timeout time for methods which is being changed in this KIP.  It would not be recommended to use one config for all methods. However, we could use it for similar methods (e.g. methods which call updateFetchPositions() will block using one timeout configured by the user). In this manner, we could incorporate both the config and the added timeout parameter into the code.

Another alternative of interest is that we should add a new overload for poll(), particularily since the changing the old method can become unwieldly between different Kafka versions. To elaborate, a Timeout parameter will also be added to the poll() overload.

One alternative was to add a timeout parameter to the current position() and other methods. However, the changes made by the user will be much more extensive then basing the time constraint on  requestTimeoutMs because the method signature has been changed. 

 

Another possibility was the usage of requestTimeoutMs to bound position(), however, this would make the method highly inflexible, especially since requestTimeoutMs is already being used by multiple other methods

  • No labels