Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka is designed to allow messages with the same key from the same producer to be consumed in the same order as they are produced. This feature is useful for applications which maintain local states per key. However, as of current design of Kafka, this in-order delivery is not guaranteed if we expand partition of the topic. Furthermore, for topic whose traffic fluctuate significantly over time, it will be useful to be able to expand partitions when the expected byte-in-rate is high and delete partitions when the expected byte-in-rate is low. This KIP proposes a design to allow partition expansion and deletion while still ensuring in-order message delivery for keyed messages.

Goals

This KIP allows arbitrary sequence of partition expansion and deletion of an existing topic while still ensuring in-order message delivery for keyed messages, except that user can not expand partitions of an existing topic when there is still partition marked for deletion for that topic. After the size of this partition reaches zero, due to either retention or AdminClient.deleteRecords(...), then this partition will be removed from the topic and user can expand partitions of this topic.

In the future we can expand upon the work of this KIP to support partition expansion even when there is still partition marked for deletion for that topic.

 

The current KIP w.r.t. the interface that our producer and consumer exposes to the user. It ensures that if there are two messages with the same key produced by the same producer, say messageA and messageB, and suppose messageB is produced after messageA to a different partition than messageA, then we can guarantee that the following sequence can happen in order:
- Consumer of messageA can execute callback, in which user can flush state related to the key of messageA.
- messageA is delivered by its consumer to the application
- Consumer of messageB can execute callback, in which user can load the state related to the key of messageB.
- messageB is delivered by its consumer to the application.

Public Interfaces

Zookeeper

1) Update the znodes /brokers/topics/[topic] to use the following json format

Code Block
{
  "version" : int32,
  "partitions" : {
    partition -> replicaList
    ...
  },
  "initial_partition_count" : int32    <-- NEW. This is the number of partitions when the topic is created.
  "undeleted_partition_count" : int32  <-- NEW. This is the partition_count used by producer for choosing partitions.
}

 

2) Update the znodes /brokers/topics/[topic]/partitions/[partition]/state to use the following json format

Code Block
{
  "version" : int32,
  "partition_epoch" : int32
  "leaderEpochAfterCreation" : {      <-- NEW. This represents a map from partition to leaderEpoch for lower partitions.
    int32 -> int32
    ...
  },
  "leaderEpochBeforeDeletion" : {     <-- NEW. This represents a map from partition to leaderEpoch for lower partitions.
    int32 -> int32
    ...
  }
}

Protocol

1) Update LeaderAndIsrRequest to re-use topic field for all its partitions and add field undeleted_partition_count for each topic.

Code Block
LeaderAndIsrRequest => controller_id controller_epoch topic_states live_leaders
  controller_id => int32
  controller_epoch => int32
  topic_states => [LeaderAndIsrRequestTopicState]    <-- NEW. This field includes LeaderAndIsrRequestPartitionState
  live_leaders => [LeaderAndIsrRequestLiveLeader]

LeaderAndIsrRequestTopicState => topic partition_states
  topic => str                                       <-- This is moved from LeaderAndIsrRequestPartitionState.
  undeleted_partition_count => int32                 <-- NEW. This is the total number of partitons of this topic.
  partition_states => [LeaderAndIsrRequestPartitionState]

LeaderAndIsrRequestPartitionState => partition leader leader_epoch isr zk_version replicas
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  is_new_replica =>  boolean


2) Update ProduceRequest to include undeleted_partition_count per topic.

Code Block
ProduceRequest => transactional_id acks timeout topic_data
  transaction_id => nullable_str
  acks => int16
  timeout => int32
  topic_data => [TopicProduceData]
 
TopicData => topic data
  topic => str
  undeleted_partition_count => int32    <-- NEW. This is the number of undeleted partitons of this topic expected by the producer.
  data => PartitionData
 
PartitionData => partition record_set
  partition => int32
  record_set => Records

 

3) Add ConsumerGroupPositionRequest that allows a consumer to report its own position of partitions and query position of partitions of other consumers in its consumer group.

Code Block
ConsumerGroupPositionRequest => group_id generation_id member_id topics
  group_id => str
  generation_id => int32
  member_id => str
  topics => [ConsumerGroupPositionRequestTopic]
 
ConsumerGroupPositionRequestTopic => topic partitions
  topic => str
  partitions => [ConsumerGroupPositionRequestPartition]
 
ConsumerGroupPositionRequestPartition => partition need_position position
  partition => int32
  need_position => boolean // If true, ConsumerGroupPositionResponse should include the position of this partition of the group.
  position => int64        // Position of this partition of this consumer.

 

4) Update ConsumerGroupPositionResponse to include fields need_position and position for relevant partitions.

Code Block
ConsumerGroupPositionResponse => throttle_time_ms topics error_code
  throttle_time_ms => int32
  topics => [ConsumerGroupPositionResponseTopic]
  error_code => int16
 
ConsumerGroupPositionResponseTopic => topic partitions
  topic => str
  partitions => [ConsumerGroupPositionResponsePartition]
 
ConsumerGroupPositionResponsePartition => partition need_position position
  partition => int32
  need_position => boolean // If true, ConsumerGroupPositionRequest should include the position of this partition of this consumer
  position => int64        // Position of this partition of the group.

 

5) Add PartitionLeaderEpochsForPartitionsRequest and PartitionLeaderEpochsForPartitionsResponse. PartitionLeaderEpochsForPartitionsResponse essentially encodes the leaderEpochAfterCreation and the leaderEpochBeforeDeletion map for those partitions specified in the PartitionLeaderEpochsForPartitionsRequest.

Code Block
PartitionLeaderEpochsForPartitionsRequest => topics
  topics => [PartitionLeaderEpochsForPartitionsRequestTopic]

PartitionLeaderEpochsForPartitionsRequestTopic => topic partitions
  topic => str
  partitions => [int32]


Code Block
PartitionLeaderEpochsForPartitionsResponse => throttle_time_ms topics
  throttle_time_ms => int32
  topics => [PartitionLeaderEpochsForPartitionsResponseTopic]
 
PartitionLeaderEpochsForPartitionsResponseTopic => topic partitions
  topic => str
  partitions => [PartitionLeaderEpochsForPartitionsResponsePartition]
 
PartitionLeaderEpochsForPartitionsResponsePartition => partition leaderEpoch
  partition => int32
  leader_epoch_after_creation => int32   // -1 if the given partition is not in leaderEpochAfterCreation of the partition znode.
  leader_epoch_before_deletion => int32  // -1 if the given partition is not in leaderEpochBeforeDeletion of the partition znode.

 

6) Update UpdateMetadataRequest to re-use topic field for all its partitions and add fields undeleted_partition_count and initial_partition_count for each topic.

Code Block
UpdateMetadataRequest => controller_id controller_epoch max_partition_epoch partition_states live_brokers
  controller_id => int32
  controller_epoch => int32
  max_partition_epoch => int32
  topic_states => [UpdateMetadataRequestTopicState]
  live_brokers => [UpdateMetadataRequestBroker]
 
UpdateMetadataRequestTopicState => topic partition_states
  topic => str
  initial_partition_count => int32     <-- NEW. This represents a map from partition to leaderEpoch for lower partitions.
  undeleted_partition_count => int32    <-- NEW. This is the number of undeleted partitons of this topic expected by the producer.
  partition_states => [UpdateMetadataRequestTopicState]
 
UpdateMetadataRequestPartitionState => partition controller_epoch leader leader_epoch partition_epoch isr zk_version replicas offline_replicas
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  partition_epoch => int32
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  offline_replicas => [int32]


7) Add fields undeleted_partition_count and initial_partition_count for each topic in MetadataResponse.

Code Block
MetadataResponse => throttle_time_ms max_partition_epoch brokers cluster_id controller_id topic_metadata 
  throttle_time_ms => int32
  max_partition_epoch => int32
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  topic_metadata => [TopicMetadata]

TopicMetadata => topic_error_code topic is_internal partition_metadata
  topic_error_code => int16
  topic => str
  initial_partition_count => int32     <-- NEW. This represents a map from partition to leaderEpoch for lower partitions.
  undeleted_partition_count => int32    <-- NEW. This is the number of undeleted partitons of this topic expected by the producer.
  is_internal => boolean
  partition_metadata => [PartitionMetadata]
 
PartitionMetadata => partition_error_code partition_id leader replicas leader_epoch partition_epoch isr offline_replicas
  partition_error_code => int16
  partition_id => int32
  leader => int32
  replicas => [int32]
  leader_epoch => int32
  partition_epoch => int32
  isr => [int32]
  offline_replicas => [int32]

 

8) Add field undeleted_partition_count for each topic in the FetchRequest

Code Block
FetchRequest => replica_id max_wait_time min_bytes max_bytes isolation_level session_id epoch topics forgetten_topics_data
  replica_id => int32
  max_wait_time => int32
  min_bytes => int32
  max_bytes => int32
  isolation_level => int8
  session_id => int32
  epoch => int32
  topics => [FetchRequestTopic]
  forgetten_topics_data => [ForgottenTopicData]FetchRequestTopic => topic undeleted_partition_count partitions  topic => str  undeleted_partition_count => int32    <-- NEW. This is the number of undeleted partitons of this topic expected by the producer. 
  partitions => [FetchRequestPartition]

 

Consumer API

1) Add the following method to the interface org.apache.kafka.clients.consumer.Consumer

Code Block
public void subscribe(Collection<String> topics, ConsumerRebalanceListener consumerRebalanceListener, PartitionKeyRebalanceListener partitionKeyRebalanceListener);
 
 
public interface PartitionKeyRebalanceListener {
 
  /*
   *  This callback allows user to flush state related to the keys previously received in the given set of partitions before another consumer loads state for such keys and starts to consume messages with these keys. This can happen after partition creation or deletion for an existing topic.
   */
  void onPartitionKeyMaybeRevoked(Collection<TopicPartition>);

  /*
   *  This callback allows user to load state for new keys that may be received in the given set of partitions. This can happen after partition creation or deletion for an existing topic.
   */
  void onPartitionKeyAssigned(Collection<TopicPartition>);


}

Topic config

 
1) Add per-topic config enable.ordered.delivery. The default value is true. When it is set to false
When it is set to false, consumer will not performance the operation, described in Proposed Changes 4), 7) and 8), for partitions of the given topic.

...