You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 13 Next »

Status

Current stateUnder Discussion

Discussion thread

JIRA

Motivation

 

Kafka is designed to allow messages with the same key from the same producer to be consumed in the same order as they are produced. This feature is useful for applications which maintain local states per key. However, as of current design of Kafka, this in-order delivery is not guaranteed if we expand partition of the topic. This KIP proposes a design to support in-order message delivery even if we expand partition of the topic.

 

Public Interfaces

Zookeeper

Update the znodes /brokers/topics/[topic]/partitions/[partition] to use the following json format

{
  "version" : int32,
  "partition_epoch" : int32
  "leaderEpochAfterCreation" : {      <-- NEW. This represents a map from partition to leaderEpoch for lower partitions.
    int32 -> int32
    ...
  },
  "leaderEpochBeforeDeletion" : {     <-- NEW. This represents a map from partition to leaderEpoch for lower partitions.
    int32 -> int32
    ...
  }
}

Protocol

1) Update LeaderAndIsrRequest to re-use topic field for all its partitions and add field undeleted_partition_count for each topic.

LeaderAndIsrRequest => controller_id controller_epoch topic_states live_leaders
  controller_id => int32
  controller_epoch => int32
  topic_states => [LeaderAndIsrRequestTopicState]    <-- NEW. This field includes LeaderAndIsrRequestPartitionState
  live_leaders => [LeaderAndIsrRequestLiveLeader]

LeaderAndIsrRequestTopicState => topic partition_states
  topic => str                                       <-- This is moved from LeaderAndIsrRequestPartitionState.
  undeleted_partition_count => int32                 <-- NEW. This is the total number of partitons of this topic.
  partition_states => [LeaderAndIsrRequestPartitionState]
 
LeaderAndIsrRequestPartitionState => partition leader leader_epoch isr zk_version replicas
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  is_new_replica =>  boolean


2) Update ProduceRequest to include undeleted_partition_count per topic.

ProduceRequest => transactional_id acks timeout topic_data
  transaction_id => nullable_str
  acks => int16
  timeout => int32
  topic_data => [TopicProduceData]
 
TopicData => topic data
  topic => str
  undeleted_partition_count => int32    <-- NEW. This is the number of undeleted partitons of this topic expected by the producer.
  data => PartitionData
 
PartitionData => partition record_set
  partition => int32
  record_set => Records

 

3) Update HeartbeatRequest to include fields need_position and position for relevant partitions.

HeartbeatRequest => group_id generation_id member_id topics
  group_id => str
  generation_id => int32
  member_id => str
  topics => [HeartbeatRequestTopic]               <-- NEW
 
HeartbeatRequestTopic => topic partitions
  topic => str                                    <-- NEW
  partitions => [HeartbeatRequestPartition]       <-- NEW
 
HeartbeatRequestPartition => partition need_position position
  partition => int32         <-- NEW
  need_position => boolean   <-- NEW. If true, HearbeatResponse should include the position of this partition of the group.
  position => int64          <-- NEW. Position of this partition of this consumer.

 

4) Update HeartbeatResponse to include fields need_position and position for relevant partitions.

HeartbeatResponse => throttle_time_ms topics error_code
  throttle_time_ms => int32
  topics => [HeartbeatResponseTopic]                 <-- NEW
  error_code => int16
 
HeartbeatResponseTopic => topic partitions
  topic => str                                       <-- NEW
  partitions => [HeartbeatResponsePartition]         <-- NEW
 
HeartbeatResponsePartition => partition need_position position
  partition => int32          <-- NEW
  need_position => boolean    <-- NEW. If true, HearbeatRequest should include the position of this partition of this consumer
  position => int64           <-- NEW. Position of this partition of the group.

 

5) Add PartitionLeaderEpochsForPartitionsRequest and PartitionLeaderEpochsForPartitionsResponse. PartitionLeaderEpochsForPartitionsResponse essentially encodes the leaderEpochAfterCreation and the leaderEpochBeforeDeletion map for those partitions specified in the PartitionLeaderEpochsForPartitionsRequest.

PartitionLeaderEpochsForPartitionsRequest => topics
  topics => [PartitionLeaderEpochsForPartitionsRequestTopic]
 
PartitionLeaderEpochsForPartitionsRequestTopic => topic partitions
  topic => str
  partitions => [int32]
PartitionLeaderEpochsForPartitionsResponse => throttle_time_ms topics
  throttle_time_ms => int32
  topics => [PartitionLeaderEpochsForPartitionsResponseTopic]
 
PartitionLeaderEpochsForPartitionsResponseTopic => topic partitions
  topic => str
  partitions => [PartitionLeaderEpochsForPartitionsResponsePartition]
 
PartitionLeaderEpochsForPartitionsResponsePartition => partition leaderEpoch
  partition => int32
  leader_epoch_after_creation => int32   // -1 if the given partition is not in leaderEpochAfterCreation of the partition znode.
  leader_epoch_before_deletion => int32  // -1 if the given partition is not in leaderEpochBeforeDeletion of the partition znode.

 

6) Update UpdateMetadataRequest to re-use topic field for all its partitions and add field undeleted_partition_count for each topic.

UpdateMetadataRequest => controller_id controller_epoch max_partition_epoch partition_states live_brokers
  controller_id => int32
  controller_epoch => int32
  max_partition_epoch => int32
  topic_states => [UpdateMetadataRequestTopicState]
  live_brokers => [UpdateMetadataRequestBroker]
 
UpdateMetadataRequestTopicState => topic partition_states
  topic => str
  undeleted_partition_count => int32                       <-- NEW
  partition_states => [UpdateMetadataRequestTopicState]
 
UpdateMetadataRequestPartitionState => partition controller_epoch leader leader_epoch partition_epoch isr zk_version replicas offline_replicas
  partition => int32
  controller_epoch => int32
  leader => int32
  leader_epoch => int32
  partition_epoch => int32
  isr => [int32]
  zk_version => int32
  replicas => [int32]
  offline_replicas => [int32]


7) Add field undeleted_partition_count for each topic in MetadataResponse.

MetadataResponse => throttle_time_ms max_partition_epoch brokers cluster_id controller_id topic_metadata 
  throttle_time_ms => int32
  max_partition_epoch => int32
  brokers => [MetadatBroker]
  cluster_id => nullable_str
  controller_id => int32
  topic_metadata => [TopicMetadata]

TopicMetadata => topic_error_code topic is_internal partition_metadata
  topic_error_code => int16
  topic => str
  undeleted_partition_count => int32               <-- NEW
  is_internal => boolean
  partition_metadata => [PartitionMetadata]
 
PartitionMetadata => partition_error_code partition_id leader replicas leader_epoch partition_epoch isr offline_replicas
  partition_error_code => int16
  partition_id => int32
  leader => int32
  replicas => [int32]
  leader_epoch => int32
  partition_epoch => int32
  isr => [int32]
  offline_replicas => [int32]

 

 

Proposed Changes


1) Changes in the controller for handling partition expansion

Here we describe how topic znode change triggers partition expansion logic in the controller

- User uses kafka-topics.sh to update the topic znode with the new assignment. This triggers the topic znode listener in controller.

- For those partitions of this topic which already have the partition znode, controller increments their leaderEpoch by 1 in the partition znode. Controller sends LeaderAndIsrRequest and wait for LeaderAndIsrResponse. The LeaderAndIsrRequest should include the new leaderEpoch for each partition and the undeleted_partition_count of the topic.

- For each partition of this topic which does not have the partition znode, controller creates the partition znode, such that the leaderEpochAfterCreation field in the znode data maps partition of this topic to the corresponding leaderEpoch (recorded before controller increments the leaderEpoch)

- Controller propagates the UpdateMetadataRequest with the latest undeleted_partition_count per topic.

- Controller continue the existing logic of partition expansion.

Note that this procedure is fault tolerant. If controller fails in any of these step, the new controller can continue creating partition znode following the same procedure.


2) Changes in the controller for handling partition deletion




3) Changes in how broker handle ProduceRequest

- When broker receives LeaderAndIsrRequest, in addition to the existing procedure (e.g. updating the leaderEpochCache for the new leaderEpoch), the broker should record in memory the undeleted_partition_count for each topic.

- When broker receives ProduceRequest, for each partition in the request, broker checks whether its undeleted_partition_count equals the undeleted_partition_count from the most recent LeaderAndIsrRequest. If yes, broker handles the produce request in the current way. If no, broker rejects this partition with InvalidPartitionMetadataException. This error extends InvalidMetadaException and should trigger producer to update its metadata and retry.


4) Changes in how producer constructs ProduceRequest

- Producer should include the undeleted_partition_count for each topic in the ProduceRequest.

- Producer will update metadata and retry ProduceRequest if ProduceResponse shows InvalidPartitionMetadataException, which happens if producer's undeleted_partition_count is different (maybe newer or older) than the undeleted_partition_count in the broker.


5) Changes in the leader of the consumer group

Leader of the consumer group query the metadata to split the partition list to those partitions that have not been marked for deletion and those partitions that have been marked for deletion. It should apply the user-defined assignment algorithm to these two lists seperately to determine the partition distribution across consumers in the group, so that partitions which have not been marked for deletion can also be evenly distributed across consumers. This is to prevent load imbalance across consumers because there will be no new data to those partitions which have been marked for deletion.


6) Changes in how consumer handles HeartbeatRequest and HeartbeatResponse

- HeartbeatRequest includes the current position for each partition requested by the coordinator from the previous HeartbeatResponse. It also includes the list of partitions for which it wants to know the position (of the consumer that is consuming this partitoin).

- Group coordinator remembers the positions for those partitions which are interesting to some consumers of the given group.

- HeartbeatResponse includes the position for the requested partitons based on the most recent HeartbeatRequest from consumers of the group. It also includes the list of partitions which are interesting to some consumers of the given group.


7) Changes in how consumer consumes partition

1. Consumer receives SyncGroupResponse, which contains its assigned partitions

2. Consumer gets the startPosition, i.e.the committedOffset, for its assigned partitions.

3. Consumer sends ListOffsetRequest to get the earliest offset for its assigned partitions.

4. For each partition P1 whose startPosition is not available, or whose startPosition equals the earliest offset, consumer does the following before consuming the partition P1:

    4.1 Consumer sends PartitionLeaderEpochsForPartitionsRequest to the coordinator to get the leaderEpochAfterCreation map for the partition P1, which can be read by broker from the corresponding partition znode. Then the consumer sends OffsetsForLeaderEpochRequest to convert the leaderEpochAfterCreation from (priorPartition -> oldLeaderEpoch) to (priorPartition -> lowerOffsetThreshold), where lowerOffsetThreshold should be the last offset of messages published under the oldLeaderEpoch for the given priorPartition.

    4.2 Consumer includes the keys (i.e. partitions) of the leaderEpochAfterCreation map in the HeartbeatRequest and gets the corresponding position of these partitions of the consumer group in the HeartbeatResponse. Consumer only starts to consume partition P1 if for all its priorPartition, the position >= lowerOffsetThreshold of the priorParition.

5. For each parrtition P1 assigned to this consumer, consumer queries the metadata to see if there any partition of this topic has been marked for deletion. If so, consumer does the following before deliverying a message with offset T from this partition P1:

    5.1 Consumer sends PartitionLeaderEpochsForPartitionsRequest to coordinator to get the leaderEpochBeforeDeletion map for all partitions of this topic. Note that a partition is marked for deletion if and only if the leaderEpochBeforeDeletion map in its partition znode is not empty. For each partition P2 whose leaderEpochBeforeDeletion includes the partition P1, consumer then sends OffsetsForLeaderEpochRequest to convert the leaderEpoch (i.e. P2.leaderEpochBeforeDeletion[P1]) to upperOffsetThreshold. This results in a map from P2 -> upperOffsetThreshold, where P2 represents all those partitions whose leaderEpochBeforeDeletion includes the partition P1.

    5.2 For all partitions whose leaderEpochBeforeDeletion includes the partition P1, consumer includes these partitions in the HeartbeatRequest and gets the corresponding position of these partitions of the consumer group in the HeartbeatResponse. Consumer also sends ListOffsetRequest to get the LogEndOffset for these partitons.

    5.3 Consumer only consumers message with offset T from partition P1, if for every partition P2 whose leaderEpochBeforeDeletion includes the partition P1, either the position of P2 of the consumer group has reached the LogEndOffset of P2, or the offset T <= upperOffsetThreshold.



Compatibility, Deprecation, and Migration Plan

The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Rejected Alternatives

 

Future work

 

 

 

  • No labels