Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka is designed to allow messages with the same key from the same producer to be consumed in the same order as they are produced. This feature is useful for applications which maintain local states per key. However, as of current design of Kafka, this in-order delivery is not guaranteed if we expand partition of the topic. This KIP proposes a design to support in-order message delivery even if we expand partition of the topic.

 

Public Interfaces

Zookeeper

Update the znodes /brokers/topics/[topic]/partitions/[partition] to use the following json format

Code Block
{
  "version" : int32,
  "partition_epoch" : int32
  "priorLeaderEpochsleaderEpochAfterCreation" : {      <-- NEW. This represents a map from smaller partition index to its leaderEpoch partition to leaderEpoch for lower partitions.
    int32 -> int32
    ...
  },
  "leaderEpochBeforeDeletion" : {     <-- NEW. This represents a map from partition to leaderEpoch for lower partitions.
    int32 -> int32
    ...
  }
}

Protocol

1) Update LeaderAndIsrRequest to re-use topic field for all its partitions and add field partition_number for each topic.

Code Block
LeaderAndIsrRequest => controller_id controller_epoch topic_states live_leaders
  controller_id => int32
  controller_epoch => int32
  topic_states => [LeaderAndIsrRequestTopicState]    <-- NEW. This field includes LeaderAndIsrRequestPartitionState
  live_leaders => [LeaderAndIsrRequestLiveLeader]

LeaderAndIsrRequestTopicState => topic partition_states
  topic => str                                       <-- This is moved from LeaderAndIsrRequestPartitionState.
  partition_number => int32                          <-- NEW. This is the total number of partitons of this topic.
  partition_states => [LeaderAndIsrRequestPartitionState]
 
LeaderAndIsrRequestPartitionState => partition leader leader_epoch isr zk_version replicas
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  is_new_replica =>  boolean


2) Update ProduceRequest to include partition_number per topic.

Code Block
ProduceRequest => transactional_id acks timeout topic_data
  transaction_id => nullable_str
  acks => int16
  timeout => int32
  topic_data => [TopicProduceData]
 
TopicData => topic data
  topic => str
  undeleted_partition_numbercount => int32    <-- NEW. This is the total number of undeleted partitons of this topic expected by the producer.
  data => PartitionData
 
PartitionData => partition record_set
  partition => int32
  record_set => Records

 

3) Update HeartbeatRequest to include fields need_position and position for relevant partitions.

Code Block
HeartbeatRequest => group_id generation_id member_id topics
  group_id => str
  generation_id => int32
  member_id => str
  topics => [HeartbeatRequestTopic]               <-- NEW
 
HeartbeatRequestTopic => topic partitions
  topic => str                                    <-- NEW
  partitions => [HeartbeatRequestPartition]       <-- NEW
 
HeartbeatRequestPartition => partition need_position position
  partition => int32         <-- NEW
  need_position => boolean   <-- NEW. If true, HearbeatResponse should include the position of this partition of the group.
  position => int64          <-- NEW. Position of this partition of this consumer.

 

4) Update HeartbeatResponse to include fields need_position and position for relevant partitions.

Code Block
HeartbeatResponse => throttle_time_ms topics error_code
  throttle_time_ms => int32
  topics => [HeartbeatResponseTopic]                 <-- NEW
  error_code => int16
 
HeartbeatResponseTopic => topic partitions
  topic => str                                       <-- NEW
  partitions => [HeartbeatResponsePartition]         <-- NEW
 
HeartbeatResponsePartition => partition need_position position
  partition => int32          <-- NEW
  need_position => boolean    <-- NEW. If true, HearbeatRequest should include the position of this partition of this consumer
  position => int64           <-- NEW. Position of this partition of the group.

 

5) Add LeaderEpochForPartitionRequest and LeaderEpochForPartitionResponse.

See 1) and 5) in Proposed Changes section for how this request and resopnse are used.

Code Block
LeaderEpochsForPartitionRequestDependentPartitionLeaderEpochsForPartitionsRequest => topics
  topics => [LeaderEpochsForPartitionRequestTopic]
 
LeaderEpochsForPartitionRequestTopic => topic partitions
  topic => str
  partitions => [int32]
Code Block
LeaderEpochsForPartitionResponse => throttle_time_ms topics
  throttle_time_ms => int32
  topics => [LeaderEpochsForPartitionResponseTopic]
 
LeaderEpochsForPartitionResponseTopic => topic partitions
  topic => str
  partitions => [LeaderEpochsForPartitionResponsePartition]
 
LeaderEpochsForPartitionResponsePartition => partition leaderEpoch
  partition => int32
  leader_epoch => int32
 

 

...