Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  "version" : int32,
  "topic_epoch" : int32,  // This is newly added
  "partitions" : {
    partition -> [int32]   // partition has type int32. This maps partition to list of replicas
    ..
  },
  "topic_epoch" : int32  // This is newly added
}

Request/Response protocol

...

Code Block
UpdateMetadataRequest => controller_id controller_epoch partition_states live_brokers
  controller_id => int32
  controller_epoch => int32
  partition_states => [UpdateMetadataRequestPartitionState]
  live_brokers => [UpdateMetadataRequestBroker]
 
UpdateMetadataRequestPartitionState => topic partition controller_epoch leader leader_epoch topic_epoch isr zk_version replicas offline_replicas
  topic => string
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  topic_epoch => int32        <-- NEW
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  offline_replicas => [int32]

 

2) Add leaderAdd topic_epoch and topicand leader_epoch to MetadataResponseepoch to MetadataResponse

Code Block
MetadataResponse => throttle_time_ms brokers cluster_id controller_id topic_metadata
  throttle_time_ms => int32
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  topic_metadata => [TopicMetadata]
 
TopicMetadata => topic_error_code topic is_internal topic_epoch topic_epoch partition_metadata
  topic_error_code => int16
  topic => str
  is_internal => boolean
  topic_epoch => int32            <-- NEW
  partition_metadata => [PartitionMetadata]
 
PartitionMetadata => partition_error_code partition_id leader replicas leader_epoch topic_epoch isr offline_replicas
  partition_error_code => int16
  partition_id => int32
  leader => int32
  replicas => [int32]
  leader_epoch => int32           <-- NEW
  isr => [int32]
  offline_replicas => [int32]

 

3) Add leadertopic_epoch field and topic_epoch to and leader_epoch to OffsetCommitRequest

Code Block
OffsetCommitRequest => group_id generation_id memeber_id retention_time topics
  group_id => str
  generation_id => int32
  member_id => str
  retention_time => int64
  topics => [OffsetCommitRequestTopic]
 
OffsetCommitRequestTopic => topic topic_epoch partitions
  topic => str
  topic_epoch => int32            <-- NEW
  partitions => [OffsetCommitRequestPartition]
 
OffsetCommitRequestPartition => partition offset leader_epoch topic_epoch metadata
  partition => int32
  offset => int64
  leader_epoch => int32           <-- NEW
  metadata => nullable_str

 

4) Add leadertopic_epoch and topicand leader_epoch to OffsetFetchResponse

Code Block
OffsetFetchResponse => throttle_time_ms response error_code
  throttle_time_ms => int32
  responses => [OffsetFetchResponseTopic]
  error_code => int16
 
OffsetFetchResponseTopic => topic topic_epoch partition_responses
  topic => str
  topic_epoch => int32            <-- NEW
  partition_responses => [OffsetFetchResponsePartition]
 
OffsetFetchResponsePartition => partition offset leader_epoch topic_epoch metadata error_code
  partition => int32
  offset => int64
  leader_epoch => int32           <-- NEW
  metadata => nullable_str
  error_code => int16
 

Offset topic schema

Add leadertopic_epoch and topicleader_epoch to the schema of the offset topic value.

Code Block
OFFSET_COMMIT_VALUE_SCHEMA => offset leadertopic_epoch topicleader_epoch metadata commit_timestamp expire_timestamp 
  offset => int64
  leadertopic_epoch => int32            <-- NEW
  topicleader_epoch => int32            <-- NEW
  metadata => str
  commit_timestamp => int64
  expire_timestamp => int64

...