Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
MetadataResponse => throttle_time_ms brokers cluster_id controller_id topic_metadata 
  throttle_time_ms => int32
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  topic_metadata => [TopicMetadata]
 
TopicMetadata => topic_error_code topic is_internal topic_epoch topic_epoch partition_metadata
  topic_error_code => int16
  topic => str
  is_internal => boolean
  topic_epoch => int32            <-- NEW
  partition_metadata => [PartitionMetadata]
 
PartitionMetadata => partition_error_code partition_id leader replicas leader_epoch isr offline_replicas
  partition_error_code => int16
  partition_id => int32
  leader => int32
  replicas => [int32]
  leader_epoch => int32           <-- NEW
  isr => [int32]
  offline_replicas => [int32]

...

Code Block
OFFSET_COMMIT_VALUE_SCHEMA => offset topic_epoch leader_epoch metadata commit_timestamp expire_timestamp 
  offset => int64
  topic_epoch => int32            <-- NEW
  leader_epoch => int32           <-- NEW
  metadata => str
  commit_timestamp => int64
  expire_timestamp => int64

Consumer API

1) Add error INVALID_INTERNAL_METADATA. This will be a non-retriable error which may be thrown from consumer's API. The corresponding exception is InvalidInternalMetadataException.2) Add the following methods to the interface org.apache.kafka.clients.consumer.Consumer

Code Block
void seek(TopicPartition partition, long offset, String internalMetadataoffsetEpoch);
 
OffsetAndMetadata positionWithInternalMetadatapositionWithOffsetEpoch(TopicPartition partition);

32) Add field internalMetadata to offsetEpoch to the class org.apache.kafka.clients.consumer.OffsetAndMetadata

Code Block
public class OffsetAndMetadata {
 
  // This isfield the newly-added internal metadata used by Kafka client implementationencodes leader_epoch and partition_epoch. User should not need to interpret this value.
  private final StringOffsetEpoch internalMetadataoffsetEpoch;
 
   .... // TheOther existing metadata variable used by user. Kafka client or broker should not need to interpret this value.
  private final String metadata;
 
  .... // Other existing variables and methods
 
}

 

This field should be a json formatted string that currently encodes leader_epoch and topic_epoch with the following format:

Code Block
{
  "version" : 1,
  "topic_epoch" : int32,
  "leader_epoch" : int32
}
 variables and methods
 
}
 
// This class will encode leader_epoch and topic_epoch. It can be evolved to encode more information in the future.
public class OffsetEpoch {

  static OffsetEpoch decode(byte[]);

  public byte[] encode();

  public String toString();
 
  private int topicEpoch();
 
  private int leaderEpoch();
 
  private OffsetEpoch(int topicEpoch, int leaderEpoch);

}

 

We can evolve the format of this field in the future as needed. This should only be used by Kafka client implementation and user should not need to interpret this string.

...

When consumer commits offset, it looks up includes leader_epoch and topic_epoch together with offset in the cached metadata and includes this value in the OffsetCommitRequest. The leader_epoch and topic_epoch will then be written in the message to the offset topic.

When coordinator receives the OffsetCommitRequest, for each partition in the OffsetCommitRequest, it will additionally check whether the leader_epoch or topic_epoch in the request. If the topic_epoch from the OffsetCommitRequest is smaller than the last known value, the error for this partition in the OffsetCommitResponse is INVALID_INTERNAL_METADATA. Otherwise, if the topic_epoch is the same but the leader_epoch from the OffsetCommitRequest is smaller than the last known value, the error for this partition in the OffsetCommitResponse is also INVALID_INTERNAL_METADATA.

If the client/broker library is old, both the leader_epoch and the topic_epoch are assumed to be -1 and broker will preserve the existing behavior without doing this additional check.

should be the largest leader_epoch of those already consumed messages whose offset < the commit offset. If no message has been consumed since consumer initialization, the leader_epoch from seek(...) or OffsetFetchResponse should be used. The topic_epoch should be the topic_epoch of the given topic in the latest metadata. The coordinator should extract these values from the OffsetCommitRequest and write them into the offset topic.

 

6) Consumer rebalance or initialization using the offset and epoch from the Kafka offset topic

...

When committing offset, user should use the newly added API positionWithInternalMetadatapositionWithOffsetEpoch(...) to read the internal_metadata (which encodes leader_epoch and topic_epoch) as well and store them together with the offset.. This internal metadata encodes the leader_epoch and topic_epoch. The leader_epoch should be the largest leader_epoch of those already consumed messages whose offset < position. If no message has been consumed since consumer initialization, the leader_epoch from seek(...) or OffsetFetchResponse should be used. The topic_epoch should be the topic_epoch of the given topic in the latest metadata. 

When initializing consumer, user should read the externally stored internal_metadata together with the offset. Then user should call the newly added API seek(partition, offset, internalMetadataoffsetEpoch) to seek to the previous offset. Later when consumer.poll() is called, the consumer needs to repeatedly refresh metadata if a metadata is considered outdated, which is determined using the similar criteria as introduced in the above subsection "Client's metadata refresh". The only difference is that, instead of using the leader_epoch and topic_epoch from the cached metadata, the consumer uses the values that extracted from the internalMetadata provided offsetEpoch provided by the seek(...).

If the internalMetadata string offsetEpoch from seek(...) can not be parsed or interpreted decoded by Kafka client implementation, the InvalidInternalMetadataException is IllegalArgumentException is thrown from seek(...).

If user calls existing API seek(partition, offset) and there is not committed offset/leader_epoch/topic_epoch from the Kafka offset topic, both the leader_epoch and the topic_epoch are assumed to be -1 and the client will preserve the existing behavior without doing the additional check.

...