Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Consumer API

1) Add error INVALID_LEADERINTERNAL_EPOCHMETADATA. This will be a non-retriable error which may be thrown from consumer's API.

2) Add error INVALID_TOPIC_EPOCH. This will be a non-retriable error which may be thrown from consumer's API.

The corresponding exception is InvalidInternalMetadataException.

23) Add the following methods to the interface org.apache.kafka.clients.consumer.Consumer

Code Block
void seek(TopicPartition partition, long offset, String internalMetadata);
 
OffsetAndMetadata positionWithInternalMetadata(TopicPartition partition);

43) Add field internalMetadata to the class org.apache.kafka.clients.consumer.OffsetAndMetadata

...

When coordinator receives the OffsetCommitRequest, for each partition in the OffsetCommitRequest, it will additionally check whether the leader_epoch or topic_epoch in the request. If the topic_epoch from the OffsetCommitRequest is smaller than the last known value, the error for this partition in the OffsetCommitResponse is INVALID_topicINTERNAL_epochMETADATA. Otherwise, if the topic_epoch is the same but the leader_epoch from the OffsetCommitRequest is smaller than the last known value, the error for this partition in the OffsetCommitResponse is INVALIDis also INVALID_LEADERINTERNAL_EPOCHMETADATA.

If the client/broker library is old, both the leader_epoch and the topic_epoch are assumed to be -1 and broker will preserve the existing behavior without doing this additional check.

...

When initializing consumer, user should read the externally stored internal_metadata together with the offset. Then user should call the newly added API seek(partition, offset, internalMetadata) to seek to the previous offset. Later when consumer.poll() is called, the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and topic_epoch from the cached metadata, the consumer uses the values that extracted from the internalMetadata provided by the seek(...).

If the internalMetadata string from seek(...) can not be parsed or interpreted by Kafka client implementation, the InvalidInternalMetadataException is thrown from seek(...).

If user calls existing API seek(partition, offset) and there is not committed offset/leader_epoch/topic_epoch from the Kafka offset topic, both the leader_epoch and the topic_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional check.

...