...
We will also change the leader behavior so that it is not permitted to add a replica to the ISR if it is marked as offline by the controller. By currently allowing this, we weaken acks=all semantics since the zombie contributes to the min.isr
requirement, but is not actually eligible to become leader.
Public Interfaces
Config Changes
As described above, the consumer's `auto.offset.reset` will now support a fourth option to reset offsets to the "closest" position.
API Changes
We will introduce a new exception type, which will be raised from KafkaConsumer.poll(Duration)
as described above. This exception extends from `OffsetOutOfRangeException` for compatibility. If a user's offset reset policy is set to "none," they can still catch this
Code Block |
---|
class TruncatedPartitionException extends OffsetOutOfRangeException {
/**
* Get a map of the partitions which were truncated and the offset
* at which the logs diverged from what we read locally. In other
* words, offsets between this offset and the current fetch offset
* were truncated from the broker's log. If the consumer was unable
* to determine the point of divergence, then the value will be
* `Optional.empty()`.
*/
Map<TopicPartition, Optional<Long>> truncatedOffsets();
} |
The leader epoch will be exposed in the ConsumerRecord
and OffsetAndMetadata
objects.
Code Block |
---|
class ConsumerRecord<K, V> {
int leaderEpoch();
}
class OffsetAndMetadata {
int leaderEpoch();
} |
We will also have a new API to support seeking to an offset and leader epoch. For convenience and consistency, we will reuse the OffsetAndMetadata
object.
} |
The leader epoch will be exposed in the ConsumerRecord
and OffsetAndMetadata
objects.
Code Block |
---|
class ConsumerRecord<K, V> {
int leaderEpoch();
}
class OffsetAndMetadata {
int leaderEpoch();
} |
We will also have a new API to support seeking to an offset and leader epoch. For convenience and consistency, we will reuse the OffsetAndMetadata
object.
Code Block |
---|
/**
* Seek to the nearest offset taking into account leader epoch information.
* If the offset still exists and has the same leader epoch, then this will
* set the position to that offset. Otherwise, if the log has been truncated
* (e.g. due to unclean leader election), then the offset will be the first
* offset of the next largest leader epoch.
*/
void seekToNearest |
Code Block |
void seek(TopicPartition partition, OffsetAndMetadata offset); |
...