Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Kafka is designed to allow messages with the same key from the same producer to be consumed in the same order as they are produced. This feature is useful for applications which maintain local states per key. However, as of current design of Kafka, this in-order delivery is not guaranteed if we expand partition of the topic. Furthermore, for topic whose traffic fluctuate significantly over time, it will be useful to be able to expand partitions when the expected byte-in-rate is high and delete partitions when the expected byte-in-rate is low. This KIP proposes a design to allow partition expansion and deletion while still ensuring in-order message delivery for keyed messages.


This KIP allows arbitrary sequence of partition expansion and deletion of an existing topic while still ensuring in-order message delivery for keyed messages, except that user can not expand partitions of an existing topic when there is still partition marked for deletion for that topic. After the size of this partition reaches zero, due to either retention or AdminClient.deleteRecords(...), then this partition will be removed from the topic and user can expand partitions of this topic.

In the future we can expand upon the work of this KIP to support partition expansion even when there is still partition marked for deletion for that topic. 

Public Interfaces


1) Update the znodes /brokers/topics/[topic] to use the following json format

Code Block
  "version" : int32,
  "partitions" : {
    partition -> replicaList
  "undeleted_partition_count" : int32  <-- NEW. This is the partition_count used by producer for choosing partitions.


2) Update the znodes /brokers/topics/[topic]/partitions/[partition] to use the following json format

Code Block
  "version" : int32,
  "partition_epoch" : int32
  "leaderEpochAfterCreation" : {      <-- NEW. This represents a map from partition to leaderEpoch for lower partitions.
    int32 -> int32
  "leaderEpochBeforeDeletion" : {     <-- NEW. This represents a map from partition to leaderEpoch for lower partitions.
    int32 -> int32


1) Update LeaderAndIsrRequest to re-use topic field for all its partitions and add field undeleted_partition_count for each topic.

Code Block
LeaderAndIsrRequest => controller_id controller_epoch topic_states live_leaders
  controller_id => int32
  controller_epoch => int32
  topic_states => [LeaderAndIsrRequestTopicState]    <-- NEW. This field includes LeaderAndIsrRequestPartitionState
  live_leaders => [LeaderAndIsrRequestLiveLeader]

LeaderAndIsrRequestTopicState => topic partition_states
  topic => str                                       <-- This is moved from LeaderAndIsrRequestPartitionState.
  undeleted_partition_count => int32                 <-- NEW. This is the total number of partitons of this topic.
  partition_states => [LeaderAndIsrRequestPartitionState]
LeaderAndIsrRequestPartitionState => partition leader leader_epoch isr zk_version replicas
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  is_new_replica =>  boolean

2) Update ProduceRequest to include undeleted_partition_count per topic.

Code Block
ProduceRequest => transactional_id acks timeout topic_data
  transaction_id => nullable_str
  acks => int16
  timeout => int32
  topic_data => [TopicProduceData]
TopicData => topic data
  topic => str
  undeleted_partition_count => int32    <-- NEW. This is the number of undeleted partitons of this topic expected by the producer.
  data => PartitionData
PartitionData => partition record_set
  partition => int32
  record_set => Records


3) Update HeartbeatRequest to include fields need_position and position for relevant partitions.

Code Block
HeartbeatRequest => group_id generation_id member_id topics
  group_id => str
  generation_id => int32
  member_id => str
  topics => [HeartbeatRequestTopic]               <-- NEW
HeartbeatRequestTopic => topic partitions
  topic => str                                    <-- NEW
  partitions => [HeartbeatRequestPartition]       <-- NEW
HeartbeatRequestPartition => partition need_position position
  partition => int32         <-- NEW
  need_position => boolean   <-- NEW. If true, HearbeatResponse should include the position of this partition of the group.
  position => int64          <-- NEW. Position of this partition of this consumer.


4) Update HeartbeatResponse to include fields need_position and position for relevant partitions.

Code Block
HeartbeatResponse => throttle_time_ms topics error_code
  throttle_time_ms => int32
  topics => [HeartbeatResponseTopic]                 <-- NEW
  error_code => int16
HeartbeatResponseTopic => topic partitions
  topic => str                                       <-- NEW
  partitions => [HeartbeatResponsePartition]         <-- NEW
HeartbeatResponsePartition => partition need_position position
  partition => int32          <-- NEW
  need_position => boolean    <-- NEW. If true, HearbeatRequest should include the position of this partition of this consumer
  position => int64           <-- NEW. Position of this partition of the group.


5) Add PartitionLeaderEpochsForPartitionsRequest and PartitionLeaderEpochsForPartitionsResponse. PartitionLeaderEpochsForPartitionsResponse essentially encodes the leaderEpochAfterCreation and the leaderEpochBeforeDeletion map for those partitions specified in the PartitionLeaderEpochsForPartitionsRequest.

Code Block
PartitionLeaderEpochsForPartitionsRequest => topics
  topics => [PartitionLeaderEpochsForPartitionsRequestTopic]
PartitionLeaderEpochsForPartitionsRequestTopic => topic partitions
  topic => str
  partitions => [int32]
Code Block
PartitionLeaderEpochsForPartitionsResponse => throttle_time_ms topics
  throttle_time_ms => int32
  topics => [PartitionLeaderEpochsForPartitionsResponseTopic]
PartitionLeaderEpochsForPartitionsResponseTopic => topic partitions
  topic => str
  partitions => [PartitionLeaderEpochsForPartitionsResponsePartition]
PartitionLeaderEpochsForPartitionsResponsePartition => partition leaderEpoch
  partition => int32
  leader_epoch_after_creation => int32   // -1 if the given partition is not in leaderEpochAfterCreation of the partition znode.
  leader_epoch_before_deletion => int32  // -1 if the given partition is not in leaderEpochBeforeDeletion of the partition znode.


6) Update UpdateMetadataRequest to re-use topic field for all its partitions and add field undeleted_partition_count for each topic.

Code Block
UpdateMetadataRequest => controller_id controller_epoch max_partition_epoch partition_states live_brokers
  controller_id => int32
  controller_epoch => int32
  max_partition_epoch => int32
  topic_states => [UpdateMetadataRequestTopicState]
  live_brokers => [UpdateMetadataRequestBroker]
UpdateMetadataRequestTopicState => topic partition_states
  topic => str
  undeleted_partition_count => int32                       <-- NEW
  partition_states => [UpdateMetadataRequestTopicState]
UpdateMetadataRequestPartitionState => partition controller_epoch leader leader_epoch partition_epoch isr zk_version replicas offline_replicas
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  partition_epoch => int32
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  offline_replicas => [int32]

7) Add field undeleted_partition_count for each topic in MetadataResponse.

Code Block
MetadataResponse => throttle_time_ms max_partition_epoch brokers cluster_id controller_id topic_metadata 
  throttle_time_ms => int32
  max_partition_epoch => int32
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  topic_metadata => [TopicMetadata]

TopicMetadata => topic_error_code topic is_internal partition_metadata
  topic_error_code => int16
  topic => str
  undeleted_partition_count => int32               <-- NEW
  is_internal => boolean
  partition_metadata => [PartitionMetadata]
PartitionMetadata => partition_error_code partition_id leader replicas leader_epoch partition_epoch isr offline_replicas
  partition_error_code => int16
  partition_id => int32
  leader => int32
  replicas => [int32]
  leader_epoch => int32
  partition_epoch => int32
  isr => [int32]
  offline_replicas => [int32]


Consumer API

1) Add the following method to the interface org.apache.kafka.clients.consumer.Consumer

Code Block
public void subscribe(Collection<String> topics, ConsumerRebalanceListener consumerRebalanceListener, PartitionKeyRebalanceListener partitionKeyRebalanceListener);
public interface PartitionKeyRebalanceListener {
   *  This callback allows user to flush state related to the keys previously received in the given set of partitions before another consumer loads state for such keys and starts to consume messages with these keys. This can happen after partition creation or deletion for an existing topic.
  void onPartitionKeyMaybeRevoked(Collection<TopicPartition>);

   *  This callback allows user to load state for new keys that may be received in the given set of partitions. This can happen after partition creation or deletion for an existing topic.
  void onPartitionKeyAssigned(Collection<TopicPartition>);



