...
In KAFKA-4879, we first noticed that KafkaConsumer will hang forever after a topicduring a position()
call. However, after looking closer, we have found that several KafkaConsumer
methods will continue to block indefinitely unless the offsets are retrieved for the provided TopicPartition
. To avoid this scenario from occurring:
- A complementary method will be added for each method that blocks indefinitely, but with an extra
Duration
parametertimeout
, as well as a variable giving theTimeUnit
, which bounds the amount of time spent in the method. - A
ClientTimeoutException
TimeoutException
will be thrown once the amount of time spent exceedstimeout.
Public Interfaces
Position
Unless otherwise specified, the pre-existing variants for these new interfaces will be marked @Deprecated
.
Consumer#Position
A TimeoutException
A ClientTimeoutException
will be thrown when the time spent exceeds timeout
:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists). * This method may issue a remote call to the server if there is no current position for the given partition. * <p> * This call will block until either the position could be determined or an unrecoverable error is * encountered (in which case it is thrown to the caller). * * @param partition The partition to get the position for + * @param timeout The maximum duration of the method + * @param timeunit The time unit to which timeout refers to * @return The current position of the consumer (that is, the offset of the next record to be fetched) * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for * the partition * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called + * @throws org.apache.kafka.common.errors.ClientTimeoutExceptionTimeoutException if time spent blocking forexceeds offsetsthe exceed{@code requestTimeoutMstimeout} * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors */ + public long position(TopicPartition partition, longDuration timeout, TimeUnit unit); |
...
Consumer#committed
and
...
Consumer#commitSync
Similarily, this will also be applied to other methods in KafkaConsumer that blocks indefinitely.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Get the last committed offset for the given partition (whether the commit happened by this process or * another). This offset will be used as the position for the consumer in the event of a failure. * <p> * This call will block to do a remote call to get the latest committed offsets from the server. * * @param partition The partition to check + * @param timeout The maximum duration of the method + * @param timeunit The unit of time timeout refers to * @return The last committed offset and metadata or null if there was no prior commit * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.ClientTimeoutExceptionTimeoutException if time spent methodblocking exceeds maximumthe given{@code timetimeout} */ @Override public OffsetAndMetadata committed(TopicPartition partition, final longDuration timeout, final Timeunit timeunit) { )' /** * Commit acquireAndEnsureOpen(); final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit); the specified offsets for the specified list of topics and partitions. try * {<p> * This commits offsets to Kafka. The Map<TopicPartition,offsets OffsetAndMetadata>committed offsetsusing = coordinator.fetchCommittedOffsets(Collections.singleton(partition), totalWaitTime); this API will be used on the first fetch after every return offsets.get(partition); } finally { release(); } } /** * Commit the specified offsets for the specified list of topics and partitions * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API * should not be used. The committed offset should be the next message your application will consume, * i.e. lastProcessedMessageOffset + 1. * <p> * This is a synchronous commits offsetsand towill Kafka.block Theuntil offsetseither committedthe usingcommit thissucceeds APIor willan beunrecoverable usederror onis the first fetch after every * encountered (in which case *it rebalanceis andthrown alsoto onthe startupcaller). As such, if you need* to<p> store offsets in anything other* thanNote Kafka,that thisasynchronous API offset commits sent previously with *the should not be used. The committed offset should be the next message your application will consume, * i.e. lastProcessedMessageOffset + 1.{@link #commitAsync(OffsetCommitCallback)} * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. * <p> * This@param isoffsets a synchronousA commitsmap andof willoffsets blockby untilpartition eitherwith theassociated commitmetadata + succeeds or an unrecoverable* error@param is timeout Maximum duration to block * encountered (in which case it* is thrown to@throws org.apache.kafka.clients.consumer.CommitFailedException if the caller). commit failed and cannot be * <p>retried. * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} This can only *occur (orif similar)you are guaranteedusing toautomatic havegroup theirmanagement callbackswith invoked prior to completion of this method.{@link #subscribe(Collection)}, * * @param offsets A map of offsets byor partitionif withthere associatedis metadata +an active group with *the @paramsame timeoutgroupId which Maximumis durationusing ofgroup methofmanagement. + * @param timeunit The unit of time which timeout refers to * @throws org.@throws org.apache.kafka.clientscommon.consumererrors.CommitFailedExceptionWakeupException if the commit failed and cannot be retried.{@link #wakeup()} is called before or while this * Thisfunction canis onlycalled occur if you are using automatic* group management with {@link #subscribe(Collection)}, * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * or if there is an active group with the same groupId which is using group management. this function is called +. * @throws org.apache.kafka.common.errors.WakeupExceptionTimeoutException if {@link #wakeup()} is called before or while thistime spent blocking exceeds the {@code timeout} * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for function is calledmore details * @throws org.apache.kafka.common.errors.InterruptExceptionAuthorizationException if thenot callingauthorized threadto isthe interruptedtopic beforeor orto whilethe * thisconfigured functiongroupId. is called +. * @throws org.apache.kafka.common.errors.ClientTimeoutExceptionSee the exception for more details * @throws java.lang.IllegalArgumentException if the methodcommitted blocksoffset foris morenegative than allocated time * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.AuthenticationExceptiong. if authentication fails. See the exception for more details offset metadata * * @throws org.apache.kafka.common.errors.AuthorizationException if notis authorized to the topictoo large or toif the topic does * configured groupId. See the exception for more detailsnot exist). */ void * @throws java.lang.IllegalArgumentException if the committed offset is negative * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata * is too large or if the topic does not exist). */ @Override public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final long timeout, final Timeunit timeunit) { final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit); acquireAndEnsureOpen(); try { coordinator.commitOffsetsSync(new HashMap<>(offsets), totalWaitTime); } finally { release(); } } |
Currently, commitSync
does not accept a user-provided timeout, but by default, will block indefinitely by setting wait time to Long.MAX_VALUE
. To accomadate for a potential hanging block,
the new KafkaConsumer#commitSync
will accept user-specified timeout.
Poll
Additionally, poll()
currently has two use cases: to block on initial assignment metadata (and not poll for records), and to poll for records. We'll split these use cases and truly enforce the timeout in poll at the same time by adding two new methods:
commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout); |
Currently, commitSync
does not accept a user-provided timeout, but by default, will block indefinitely by setting wait time to Long.MAX_VALUE
. To accomadate for a potential hanging block,
the new KafkaConsumer#commitSync
will accept user-specified timeout.
Consumer#poll
Additionally, poll()
currently has two use cases: to block on initial assignment metadata (and not poll for records), and to poll for records. We'll discard the first (unintentional) use case and truly enforce the timeout in poll for both metadata and data.
Note that poll() doesn't throw a TimeoutException
because its async semantics are well defined. I.e., it is well defined to return an empty response when there's no data available, and it's designed to be called repeatedly to check for data (hence the name).
Code Block |
---|
/**
* Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
* subscribed to any topics or partitions before polling for data.
* <p>
* On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
* consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
* offset for the subscribed list of partitions
*
*
* @param timeout The maximum time to block and poll for metadata updates or data.
*
* @return map of topic to records since the last fetch for the subscribed list of topics and partitions
*
* @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
* partitions is undefined or out of range and no offset reset policy has been configured
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called |
Code Block |
/** * Block until we have an assignment (and fetch offsets, etc.). * <p> * It is an error to not have subscribed to any topics or partitions before polling for data. * <p> * Throws a {@link ClientTimeoutException} if the {@code maxBlockTime} expires before the operation completes, but it * is safe to try again. * * @param maxBlockTime The maximum time to block and poll for metadata updates * * @throws org.apache.kafka.common.errors.ClientTimeoutExceptionAuthenticationException if theauthentication metadatafails. updateSee doesn'tthe completeexception withinfor themore maxBlockTimedetails * @throws org.apache.kafka.common.errors.WakeupExceptionAuthorizationException if {@link #wakeup()} is called concurrently with this functioncaller lacks Read access to any of the subscribed * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted concurrently with this function * topics or to the configured groupId. See the exception for more details * @throws org.apache.kafka.clientscommon.consumer.InvalidOffsetExceptionKafkaException iffor theany offsetother forunrecoverable a partition or set oferrors (e.g. invalid groupId or * partitionssession istimeout, undefinederrors ordeserializing out of range and no offset reset policy has been configuredkey/value pairs, or any new error cases in future versions) * @throws orgjava.apache.kafka.common.errors.AuthenticationExceptionlang.IllegalArgumentException if authentication fails. See the exceptiontimeout forvalue moreis detailsnegative * @throws orgjava.apache.kafka.common.errors.AuthorizationExceptionlang.IllegalStateException if callerthe consumer lacksis Readnot accesssubscribed to any topics or ofmanually theassigned subscribedany * topicspartitions orto toconsume the configured groupId. See the exception for more detailsfrom */ public ConsumerRecords<K, V> poll(final Duration timeout) |
We will mark the existing poll()
method as deprecated.
Consumer#partitionsFor
Code Block |
---|
/** * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or * session timeout, or any new error cases in future versions) * @throws java.lang.IllegalArgumentException if the timeout value is negative * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any * partitions to consume from */ public void awaitAssignmentMetadata(final Duration maxBlockTime); /** * Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have * subscribed to any topics or partitions before polling for data. * <p> * On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed * offset for the subscribed list of partitions * * * @param maxBlockTime The maximum time to block and poll for metadata updates or data. * * @return map of topic to records since the last fetch for the subscribed list of topics and partitions *Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it * does not already have any metadata about the given topic. * * @param topic The topic to get partition metadata for * @param timeout The maximum time this operation will block * * @return The list of partitions * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout} * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before * expiration of the configured request timeout * @throws org.apache.kafka.clientscommon.consumer.InvalidOffsetExceptionKafkaException iffor theany offsetother for a partition or set ofunrecoverable errors */ List<PartitionInfo> partitionsFor(String topic, Duration timeout); |
Consumer#listTopics
Code Block |
---|
/** * Get metadata about partitions for all topics that the user is authorized partitionsto isview. undefinedThis ormethod outwill ofissue rangea and* noremote offsetcall resetto policythe has been configuredserver. * * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this@param timeout The maximum time this operation will block * * @return The map of topics and function is calledits partitions * @throws org.apache.kafka.common.errors.InterruptExceptionTimeoutException if thetime callingspent threadblocking isexceeds interruptedthe before or while{@code timeout} * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * this function is called * @throws org.apache.kafka.common.errors.AuthenticationExceptionInterruptException if authenticationthe fails.calling Seethread theis exceptioninterrupted forbefore moreor detailswhile * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed * topics or to the configured groupId. See the exception for more details this function is called * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before * expiration of the configured request timeout * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or */ Map<String, List<PartitionInfo>> listTopics(Duration timeout) |
Consumer#offsetsForTimes
Code Block |
---|
/** * Look up the offsets for the given partitions sessionby timeout,timestamp. errorsThe deserializingreturned key/value pairs, or any new error cases in future versions) * @throws java.lang.IllegalArgumentException if the timeout value is negative * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any * partitions to consume from */ public ConsumerRecords<K, V> poll(final Duration maxBlockTime) |
We will mark the existing poll()
method as deprecated.
Proposed Changes
Note that in all cases, new methods are being added. The old methods will behave exactly as today, and will be marked "deprecated since 2.0" to provide a clean migration path.
Regarding the policy of what happens when time limit is exceeded:
1. The new KafkaConsumer#poll()
, since it returns offsets, will return an empty ConsumerRecords response.
2. A ClientTimeoutException
will be introduced to allow users to more clearly identify the reason why the method timed out. (e.g. LeaderNotAvailable, RequestTimeout etc)
3. A ClientTimeoutException
will be thrown for other methods when it times out, citing the cause as a "RequestTimeout".
offset for each partition is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
*
* This is a blocking call. The consumer does not have to be assigned the partitions.
* If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
* will be returned for that partition.
*
* @param timestampsToSearch the mapping from partition to the timestamp to look up.
* @param timeout The maximum time this operation will block
*
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
* @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws IllegalArgumentException if the target timestamp is negative
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* expiration of the configured {@code request.timeout.ms}
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
* the offsets by timestamp
*/
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) |
Consumer#beginningOffsets
Code Block |
---|
/**
* Get the first offset for the given partitions.
* <p>
* This method does not change the current consumer position of the partitions.
*
* @see #seekToBeginning(Collection)
*
* @param partitions the partitions to get the earliest offsets.
* @param timeout The maximum time this operation will block
*
* @return The earliest available offsets for the given partitions
* @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
* expiration of the configured {@code request.timeout.ms}
*/
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) |
Consumer#endOffsets
Code Block |
---|
/**
* Get the end offsets for the given partitions. In the default {@code read_uncommitted} isolation level, the end
* offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For
* {@code read_committed} consumers, the end offset is the last stable offset (LSO), which is the minimum of
* the high watermark and the smallest offset of any open transaction. Finally, if the partition has never been
* written to, the end offset is 0.
*
* <p>
* This method does not change the current consumer position of the partitions.
*
* @see #seekToEnd(Collection)
*
* @param partitions the partitions to get the end offsets.
* @param timeout The maximum time this operation will block
*
* @return The end offsets for the given partitions.
* @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking exceeds the {@code timeout}
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
* expiration of the configured {@code request.timeout.ms}
*/
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) |
Consumer#close
Notes:
close()
already is a variant with no parameters and applies a default from config. This variant will NOT be deprecatedclose(long, TimeUnit)
also exists as a variant. This one WILL be deprecated in favor of the newclose(Duration)
variant for consistency- The existing semantics of close is not to throw a
TimeoutException
. Instead, after waiting for the timeout, it forces itself closed.
Code Block |
---|
/**
* Tries to close the consumer cleanly within the specified timeout. This method waits up to
* {@code timeout} for the consumer to complete pending commits and leave the group.
* If auto-commit is enabled, this will commit the current offsets if possible within the
* timeout. If the consumer is unable to complete offset commits and gracefully leave the group
* before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
* used to interrupt close.
*
* @param timeout The maximum time to wait for consumer to close gracefully. The value must be
* non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
*
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws InterruptException If the thread is interrupted before or while this function is called
* @throws org.apache.kafka.common.KafkaException for any other error during close
*/
public void close(Duration timeout) |
Note: In the current version, fetchCommittedOffsets() will block forever if the committed offsets cannot be fetched successfully and affect position() and committed(). We need to break out of its internal while loop.
Compatibility, Deprecation, and Migration Plan
Since old methods will not be modified, preexisting data frameworks will not be affected. However, these methods might will be deprecated in favor of methods which are bound by a specific time limit.
...