THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- A secondary method will be added, but with an extra parameter
timeout
,as well as a variable giving the TimeUnit, which bounds the amount of time spent. - A
TimeoutException
will be thrown once the amount of time spent exceedstimeout.
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists). * This method may issue a remote call to the server if there is no current position for the given partition. * <p> * This call will block until either the position could be determined or an unrecoverable error is * encountered (in which case it is thrown to the caller). * * @param partition The partition to get the position for + * @param timeout The maximum duration of the method + * @param timeunit The time unit to which timeout refers to * @return The current position of the consumer (that is, the offset of the next record to be fetched) * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for * the partition * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while * this function is called + * @throws org.apache.kafka.common.errors.TimeoutException if time spent blocking for offsets exceed requestTimeoutMs * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the * configured groupId. See the exception for more details * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors */ + public long position(TopicPartition partition, long timeout, TimeUnit unit); |
Similarily, this will also be applied to other methods in KafkaConsumer which will block indefinitelythat blocks indefinitely.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Get the last committed offset for the given partition (whether the commit happened by this process or
* another). This offset will be used as the position for the consumer in the event of a failure.
* <p>
* This call will block to do a remote call to get the latest committed offsets from the server.
*
* @param partition The partition to check
+ * @param timeout The maximum duration of the method
+ * @param timeunit The unit of time timeout refers to
* @return The last committed offset and metadata or null if there was no prior commit
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+ * @throws TimeoutException if method exceeds maximum given time
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition, final long timeout, final Timeunit timeunit) {
acquireAndEnsureOpen();
final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit);
try {
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition), totalWaitTime);
return offsets.get(partition);
} finally {
release();
}
}
/**
* Commit the specified offsets for the specified list of topics and partitions.
* <p>
* This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used. The committed offset should be the next message your application will consume,
* i.e. lastProcessedMessageOffset + 1.
* <p>
* This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
* encountered (in which case it is thrown to the caller).
* <p>
* Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
* (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
*
* @param offsets A map of offsets by partition with associated metadata
+ * @param timeout Maximum duration of methof
+ * @param timeunit The unit of time which timeout refers to
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
* This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
* or if there is an active group with the same groupId which is using group management.
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws java.lang.IllegalArgumentException if the committed offset is negative
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
* is too large or if the topic does not exist).
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final long timeout, final Timeunit timeunit) {
final long totalWaitTime = determineWaitTimeInMilliseconds(timeout, timeunit);
acquireAndEnsureOpen();
try {
coordinator.commitOffsetsSync(new HashMap<>(offsets), totalWaitTime);
} finally {
release();
}
} |
Another change shall be made to KafkaConsumer#poll()
, due to its call to updateFetchPositions()
which blocks indefinitely. To combat this, poll() will automatically return
if the time limit is hit.
Proposed Changes
A TimeoutException will be thrown if time spent of position()
method exceeds requestTimeoutMs
.
...