...
We will also have a new API to support seeking to an offset and leader epoch. This is required in order to support storage of offsets in an external store. When the consumer is initialized, the user will call seek() with the offset and leader epoch that was stored. The consumer will initialize the position of the consumer using this API.
...
Code Block |
---|
/** * Seek to an offset and initialize the leader epoch (if present). */ void seek(TopicPartition partition, OffsetAndMetadata offset); |
To make the external storage use case simpler, we will provide a helper to ConsumerRecords to get the next offsets. This simplifies commit logic for consumers which only commit offsets after consuming full batches.
Code Block |
---|
class ConsumerRecords<K, V> {
/**
* Get the next offsets that the consumer will consumer.
* This can be passed directly to a commitSync(), for example,
* after the full batch has been consumed. For finer-grained,
* offset tracking, you should use the offset information from
* the individual ConsumerRecord instances.
*/
Map<TopicPartition, OffsetAndMetadata> nextOffsets();
} |
Protocol Changes
Fetch
We will bump the fetch request version in order to include the current leader epoch. For older versions, we will skip epoch validation as before.
...
The new OffsetCommit request schema is provided below. A field for the leader epoch has been added. Note that this is the epoch of the previously fetched record. The response schema matches the previous version.
Code Block | ||
---|---|---|
| ||
OffsetCommitRequest => GroupId Generation MemberId [TopicName [Partition Offset Timestamp LeaderEpochLastLeaderEpoch Metadata]] GroupId => STRING Generation => INT32 MemberId => STRING RetentionTime => INT64 TopicName => STRING Partition => INT32 Offset => INT64 LeaderEpochLastLeaderEpoch => INT32 // New Metadata => STRING |
...
Code Block | ||
---|---|---|
| ||
OffsetFetchResponse => [TopicName [Partition Offset Timestamp LeaderEpochLastLeaderEpoch Metadata ErrorCode]] TopicName => STRING Partition => INT32 Offset => INT64 LeaderEpochLastLeaderEpoch => INT32 // New Metadata => STRING ErrorCode => INT16 |
...