Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Change the new seek API to seekToNearest

...

We will also change the leader behavior so that it is not permitted to add a replica to the ISR if it is marked as offline by the controller. By currently allowing this, we weaken acks=all semantics since the zombie contributes to the min.isr requirement, but is not actually eligible to become leader.

Public Interfaces

Config Changes

As described above, the consumer's `auto.offset.reset` will now support a fourth option to reset offsets to the "closest" position.

API Changes

We will introduce a new exception type, which will be raised from KafkaConsumer.poll(Duration) as described above. This exception extends from `OffsetOutOfRangeException` for compatibility. If a user's offset reset policy is set to "none," they can still catch this 

Code Block
class TruncatedPartitionException extends OffsetOutOfRangeException {
	/**
	 * Get a map of the partitions which were truncated and the offset
     * at which the logs diverged from what we read locally. In other
     * words, offsets between this offset and the current fetch offset
     * were truncated from the broker's log. If the consumer was unable
     * to determine the point of divergence, then the value will be 
     * `Optional.empty()`.
	 */
	Map<TopicPartition, Optional<Long>> truncatedOffsets();
}

The leader epoch will be exposed in the ConsumerRecord and OffsetAndMetadata objects.

Code Block
class ConsumerRecord<K, V> {
	int leaderEpoch();
}

class OffsetAndMetadata {
  int leaderEpoch();
}

We will also have a new API to support seeking to an offset and leader epoch. For convenience and consistency, we will reuse the OffsetAndMetadata object.

}

The leader epoch will be exposed in the ConsumerRecord and OffsetAndMetadata objects.

Code Block
class ConsumerRecord<K, V> {
	int leaderEpoch();
}

class OffsetAndMetadata {
  int leaderEpoch();
}


We will also have a new API to support seeking to an offset and leader epoch. For convenience and consistency, we will reuse the OffsetAndMetadata object.

Code Block
/**
 * Seek to the nearest offset taking into account leader epoch information.
 * If the offset still exists and has the same leader epoch, then this will
 * set the position to that offset. Otherwise, if the log has been truncated
 * (e.g. due to unclean leader election), then the offset will be the first 
 * offset of the next largest leader epoch.
 */ 
void seekToNearest
Code Block
void seek(TopicPartition partition, OffsetAndMetadata offset);

...