...
In KAFKA-4879, we first noticed that KafkaConsumer will hang forever when deleting after a topic. However, after looking closer, we have found that in general the position(TopicPartition topicPartition)
,commitSync()
, committed()
, and poll()
(since pollOnce()
calls updateFetchPositions()
which blocks through a series of callbacks) method several KafkaConsumer
methods will continue to block indefinitely unless the offsets are retrieved for the provided TopicPartition
. To avoid this scenario from occurring:
- A secondary complementary method will be added for each method that blocks indefinitely, but with an extra parameter
timeout
, as well as a variable giving theTimeUnit
, which bounds the amount of time spent in the method. - A
ClientTimeoutException
will be thrown once the amount of time spent exceedstimeout.
Public Interfaces
A TimeoutException
A ClientTimeoutException
will be thrown when the time spent exceeds requestTimeoutMs
exceeds timeout
:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists). * This method may issue a remote call to the server if there is no current position for the given partition. * <p> * This call will block until either the position could be determined or an unrecoverable error is * encountered (in which case it is thrown to the caller). * * @param partition The partition to get the position for + * @param timeout The maximum duration of the method + * @param timeunit The time unit to which timeout refers to * @return The current position of the consumer (that is, the offset of the next record to be fetched) * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for * the partition * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called + * @throws org.apache.kafka.common.errors.TimeoutExceptionClientTimeoutException if time spent blocking for offsets exceed requestTimeoutMs * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors */ + public long position(TopicPartition partition, long timeout, TimeUnit unit); |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Get the last committed offset for the given partition (whether the commit happened by this process or * another). This offset will be used as the position for the consumer in the event of a failure. * <p> * This call will block to do a remote call to get the latest committed offsets from the server. * * @param partition The partition to check + * @param timeout The maximum duration of the method + * @param timeunit The unit of time timeout refers to * @return The last committed offset and metadata or null if there was no prior commit * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws TimeoutExceptionorg.apache.kafka.common.errors.ClientTimeoutException if method exceeds maximum given time */ @Override public OffsetAndMetadata committed(TopicPartition partition, final long timeout, final Timeunit timeunit) { acquireAndEnsureOpen(); final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit); try { Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition), totalWaitTime); return offsets.get(partition); } finally { release(); } } /** * Commit the specified offsets for the specified list of topics and partitions. * <p> * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. The committed offset should be the next message your application will consume, * i.e. lastProcessedMessageOffset + 1. * <p> * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is * encountered (in which case it is thrown to the caller). * <p> * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. * * @param offsets A map of offsets by partition with associated metadata + * @param timeout Maximum duration of methof + * @param timeunit The unit of time which timeout refers to * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried. * This can only occur if you are using automatic group management with {@link #subscribe(Collection)}, * or if there is an active group with the same groupId which is using group management. * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called +. * @throws org.apache.kafka.common.errors.ClientTimeoutException if the method blocks for more than allocated time * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId. See the exception for more details * @throws java.lang.IllegalArgumentException if the committed offset is negative * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata * is too large or if the topic does not exist). */ @Override public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final long timeout, final Timeunit timeunit) { final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit); acquireAndEnsureOpen(); try { coordinator.commitOffsetsSync(new HashMap<>(offsets), totalWaitTime); } finally { release(); } } |
...