Status
Current state: Under Discussion
Discussion thread:
JIRA:
Motivation
Kafka is designed to allow messages with the same key from the same producer to be consumed in the same order as they are produced. This feature is useful for applications which maintain local states per key. However, as of current design of Kafka, this in-order delivery is not guaranteed if we expand partition of the topic. This KIP proposes a design to support in-order message delivery even if we expand partition of the topic.
Public Interfaces
Zookeeper
Update the znodes /brokers/topics/[topic]/partitions/[partition] to use the following json format
{ "version" : int32, "partition_epoch" : int32 "priorLeaderEpochs" : { <-- NEW. This represents a map from smaller partition index to its leaderEpoch. int32 -> int32 ... } }
Protocol
1) Update LeaderAndIsrRequest to re-use topic field for all its partitions and add field partition_number for each topic.
LeaderAndIsrRequest => controller_id controller_epoch topic_states live_leaders controller_id => int32 controller_epoch => int32 topic_states => [LeaderAndIsrRequestTopicState] <-- NEW. This field includes LeaderAndIsrRequestPartitionState live_leaders => [LeaderAndIsrRequestLiveLeader] LeaderAndIsrRequestTopicState => topic partition_states topic => str <-- This is moved from LeaderAndIsrRequestPartitionState. partition_number => int32 <-- NEW. This is the total number of partitons of this topic. partition_states => [LeaderAndIsrRequestPartitionState] LeaderAndIsrRequestPartitionState => partition leader leader_epoch isr zk_version replicas partition => int32 controller_epoch => int32 leader => int32 leader_epoch => int32 isr => [int32] zk_version => int32 replicas => [int32] is_new_replica => boolean
2) Update ProduceRequest to include partition_number per topic.
ProduceRequest => transactional_id acks timeout topic_data transaction_id => nullable_str acks => int16 timeout => int32 topic_data => [TopicProduceData] TopicData => topic data topic => str partition_number => int32 <-- NEW. This is the total number of partitons of this topic expected by the producer. data => PartitionData PartitionData => partition record_set partition => int32 record_set => Records
3) Update HeartbeatRequest to include fields need_position and position for relevant partitions.
HeartbeatRequest => group_id generation_id member_id topics group_id => str generation_id => int32 member_id => str topics => [HeartbeatRequestTopic] <-- NEW HeartbeatRequestTopic => topic partitions topic => str <-- NEW partitions => [HeartbeatRequestPartition] <-- NEW HeartbeatRequestPartition => partition need_position position partition => int32 <-- NEW need_position => boolean <-- NEW. If true, HearbeatResponse should include the position of this partition of the group. position => int64 <-- NEW. Position of this partition of this consumer.
4) Update HeartbeatResponse to include fields need_position and position for relevant partitions.
HeartbeatResponse => throttle_time_ms topics error_code throttle_time_ms => int32 topics => [HeartbeatResponseTopic] <-- NEW error_code => int16 HeartbeatResponseTopic => topic partitions topic => str <-- NEW partitions => [HeartbeatResponsePartition] <-- NEW HeartbeatResponsePartition => partition need_position position partition => int32 <-- NEW need_position => boolean <-- NEW. If true, HearbeatRequest should include the position of this partition of this consumer position => int64 <-- NEW. Position of this partition of the group.
5) Add LeaderEpochForPartitionRequest and LeaderEpochForPartitionResponse. See 1) and 5) in Proposed Changes section for how this request and resopnse are used.
LeaderEpochsForPartitionRequest => topics topics => [LeaderEpochsForPartitionRequestTopic] LeaderEpochsForPartitionRequestTopic => topic partitions topic => str partitions => [int32]
LeaderEpochsForPartitionResponse => throttle_time_ms topics throttle_time_ms => int32 topics => [LeaderEpochsForPartitionResponseTopic] LeaderEpochsForPartitionResponseTopic => topic partitions topic => str partitions => [LeaderEpochsForPartitionResponsePartition] LeaderEpochsForPartitionResponsePartition => partition leaderEpoch partition => int32 leader_epoch => int32
Proposed Changes
1) Changes in the controller for handling partition expansion
Here we describe how topic znode change triggers partition expansion logic in the controller
- User uses kafka-topics.sh to update the topic znode with the new assignment. This triggers the topic znode listener in controller.
- For those partitions of this topic which already have the partition znode, controller increments their leaderEpoch by 1 in the partition znode. Controller sends LeaderAndIsrRequest and wait for LeaderAndIsrResponse. The LeaderAndIsrRequest should include the new leaderEpoch for each partition and the new partition count for the topic.
- For each partition of this topic which does not have the partition znode, controller creates the partition znode, such that the znode data includes the map (priorPartition -> oldLeaderEpoch), where prior partition are partitions of this topic whose partition index is smaller than the given partition.
- Controller continue the existing logic of partition expansion.
Note that this procedure is fault tolerance. If controller fails in any of these step, the new controller can continue creating partition znode following the same procedure.
2) Changes in how broker handle ProduceRequest
- When broker receives LeaderAndIsrRequest, in addition to the existing procedure (e.g. updating the leaderEpochCache for the new leaderEpoch), the broker should record the latest partition count for each topic.
- When broker receives ProduceRequest, for each partition in the request, broker checks whether its partition count is the same as the partition count from the most recent LeaderAndIsrRequest. If yes, broker handles the produce request in the current way. If no, broker rejects this partition with InvalidPartitionCountException. This error extends InvalidMetadaException and should trigger produce to update its metadata and retry.
3) Changes in how produce constructs ProduceRequest
- Producer should include the partition count for each topic in the ProduceRequest.
4) Changes in how consumer handles HeartbeatRequest and HeartbeatResponse
- HeartbeatRequest includes the current position for each partition requested by the coordinator from the previous HeartbeatResponse. It also includes the list of partitions for which it wants to know the position (of the consumer that is consuming this partitoin).
- Group coordinator remembers the positions for those partitions which are interesting to some consumers of the given group.
- HeartbeatResponse includes the position for the requested partitons based on the most recent HeartbeatRequest from consumers of the group. It also includes the list of partitions which are interesting to some consumers of the given group.
5) Changes in how consumer consumes partition
- Consumer receives SyncGroupResponse, which contains its assigned partitions
- Consumer gets the startPosition, i.e.the committedOffset, for its assigned partitions.
- For each partition whose startPosition is 0, consumer sends LeaderEpochsForPartitionRequest to the coordinator to get the map (priorPartition -> oldLeaderEpoch), which can be read by broker from the corresponding partition znode. Then the consumer sends OffsetsForLeaderEpochRequest to convert the (priorPartition -> oldLeaderEpoch) to (priorPartition -> offsetThreshold), where offsetThreshold should be the last offset of messages published under oldLeaderEpoch for the given priorPartition.
- For each partiton whose startPosition is 0, consumer includes its list of priorPartition in the HeartbeatRequest and gets the corresponding position of these partitions of the consumer group in the HeartbeatResponse. Consumer only starts to consume this partition if for all its priorPartition, the position >= offsetThreshold of the priorParition.
Compatibility, Deprecation, and Migration Plan
The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.
Rejected Alternatives
Future work