Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will also have a new API to support seeking to an offset and leader epoch. This is required in order to support storage of offsets in an external store. When the consumer is initialized, the user will call seek() with the offset and leader epoch that was stored. The consumer will initialize the position of the consumer using this API.

...

Code Block
/**
 * Seek to an offset and initialize the leader epoch (if present).
 */ 
void seek(TopicPartition partition, OffsetAndMetadata offset);

To make the external storage use case simpler, we will provide a helper to ConsumerRecords to get the next offsets. This simplifies commit logic for consumers which only commit offsets after consuming full batches.

Code Block
class ConsumerRecords<K, V> {
  /**
   * Get the next offsets that the consumer will consumer.
   * This can be passed directly to a commitSync(), for example,
   * after the full batch has been consumed. For finer-grained,
   * offset tracking, you should use the offset information from
   * the individual ConsumerRecord instances.
   */  
  Map<TopicPartition, OffsetAndMetadata> nextOffsets();
}


Protocol Changes

Fetch

We will bump the fetch request version in order to include the current leader epoch. For older versions, we will skip epoch validation as before.

...

The new OffsetCommit request schema is provided below. A field for the leader epoch has been added. Note that this is the epoch of the previously fetched record. The response schema matches the previous version.

Code Block
linenumberstrue
OffsetCommitRequest => GroupId Generation MemberId [TopicName [Partition Offset Timestamp LeaderEpochLastLeaderEpoch Metadata]]
  GroupId => STRING
  Generation => INT32
  MemberId => STRING
  RetentionTime => INT64
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  LeaderEpochLastLeaderEpoch => INT32  // New
  Metadata => STRING

...

Code Block
linenumberstrue
OffsetFetchResponse => [TopicName [Partition Offset Timestamp LeaderEpochLastLeaderEpoch Metadata ErrorCode]]
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  LeaderEpochLastLeaderEpoch => INT32  // New
  Metadata => STRING
  ErrorCode => INT16

...