...
Kafka is designed to allow messages with the same key from the same producer to be consumed in the same order as they are produced. This feature is useful for applications which maintain local states per key. However, as of current design of Kafka, this in-order delivery is not guaranteed if we expand partition of the topic. This KIP proposes a design to support in-order message delivery even if we expand partition of the topic.
Public Interfaces
Zookeeper
Update the znodes /brokers/topics/[topic]/partitions/[partition] to use the following json format
Code Block |
---|
{ "version" : int32, "partition_epoch" : int32 "priorLeaderEpochs" : { <-- NEW. This represents a map from smaller partition index to its leaderEpoch. int32 -> int32 ... } } |
Protocol
1) Update LeaderAndIsrRequest to re-use topic field for all its partitions and add field partition_number for each topic.
Code Block |
---|
LeaderAndIsrRequest => controller_id controller_epoch topic_states live_leaders controller_id => int32 controller_epoch => int32 topic_states => [LeaderAndIsrRequestTopicState] <-- NEW. This field includes LeaderAndIsrRequestPartitionState live_leaders => [LeaderAndIsrRequestLiveLeader] LeaderAndIsrRequestPartitionStateLeaderAndIsrRequestTopicState => topic partition_states topic => str <-- This is moved from LeaderAndIsrRequestPartitionState. partition_number => int32 <-- NEW. This is the total number of partitons of this topic. partition_states => [LeaderAndIsrRequestPartitionState] LeaderAndIsrRequestPartitionState => partition leader leader_epoch isr zk_version replicas partition => int32 controller_epoch => int32 leader => int32 leader_epoch => int32 isr => [int32] zk_version => int32 replicas => [int32] is_new_replica => boolean |
2) Update ProduceRequest to include partition_number per topic.
Code Block |
---|
ProduceRequest => transactional_id acks timeout topic_data transaction_id => nullable_str acks => int16 timeout => int32 topic_data => [TopicProduceData] TopicData => topic data topic => str partition_number => int32 <-- NEW. This is the total number of partitons of this topic expected by the producer. data => PartitionData PartitionData => partition record_set partition => int32 record_set => Records |
3) Update HeartbeatRequest to include fields need_position and position for relevant partitions.
Code Block |
---|
HeartbeatRequest => group_id generation_id member_id topics group_id => str generation_id => int32 member_id => str topics => [HeartbeatRequestTopic] <-- NEW HeartbeatRequestTopic => topic partitions topic => str <-- NEW partitions => [HeartbeatRequestPartition] <-- NEW HeartbeatRequestPartition => partition need_position position partition => int32 <-- NEW need_position => boolean <-- NEW. If true, HearbeatResponse should include the position of this partition of the group. position => int64 <-- NEW. Position of this partition of this consumer. |
4) Update HeartbeatResponse to include fields need_position and position for relevant partitions.
Code Block |
---|
HeartbeatResponse => throttle_time_ms topics error_code throttle_time_ms => int32 topics => [HeartbeatResponseTopic] <-- NEW error_code => int16 HeartbeatResponseTopic => topic partitions topic => str <-- NEW partitions => [HeartbeatResponsePartition] <-- NEW HeartbeatResponsePartition => partition need_position position partition => int32 <-- NEW need_position => boolean <-- NEW. If true, HearbeatRequest should include the position of this partition of this consumer position => int64 <-- NEW. Position of this partition of the group. |
5) Add LeaderEpochForPartitionRequest and LeaderEpochForPartitionResponse. See 1) and 5) in Proposed Changes section for how this request and resopnse are used.
Code Block |
---|
LeaderEpochsForPartitionRequest => topics topics => [LeaderEpochsForPartitionRequestTopic] LeaderEpochsForPartitionRequestTopic => topic partitions topic => str partitions => [int32] |
Code Block |
---|
LeaderEpochsForPartitionResponse => throttle_time_ms topics throttle_time_ms => int32 topics => [LeaderEpochsForPartitionResponseTopic] LeaderEpochsForPartitionResponseTopic => topic partitions topic => str partitions => [LeaderEpochsForPartitionResponsePartition] LeaderEpochsForPartitionResponsePartition => partition leaderEpoch partition => int32 leader_epoch => int32 |
...