Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes


1) Changes in the controller for handling partition expansion

- User executes kafka-topics.sh to update the topic znode with the new assignment. This triggers the topic znode listener in controller.

- For those partitions of this topic which already have the partition znode, controller increments their leaderEpoch by 1 in the partition znode. Controller sends LeaderAndIsrRequest and wait for LeaderAndIsrResponse. The LeaderAndIsrRequest should include the new leaderEpoch for each partition and the undeleted_partition_count of the topic.

- For each partition of this topic which does not have the partition znode, controller creates the partition znode, such that the leaderEpochAfterCreation field in the znode data maps partition of this topic to the corresponding leaderEpoch (recorded before controller increments the leaderEpoch)

- Controller propagates the UpdateMetadataRequest with the latest undeleted_partition_count per topic.

- Controller continue the existing logic of partition expansion.

Note that this procedure is fault tolerant. If controller fails in any of these step, the new controller can continue creating partition znode following the same procedure.

The motivation of this change is that, for each existing partition before the partition expansion, there exists a leaderEpoch such that all messags after this leaderEpoch will be produced with the producers using the incremented undeleted_partition_count. If for all existing partitions, a consumer group has delivered all messages up to such leaderEpoch, then it should be safe to consume messages from the newly created partition without worrying about out-of-order delivery. Consumers can get such leaderEpoch from the partition znode, convert leaderEpoch to lowerOffsetThreshold using OffsetsForLeaderEpochRequest, and make sure that it only starts consuming new partition after messages before the lowerOffsetThreshold have all been consumed.

 

2) Changes in the controller for handling partition deletion

- User executes "kafka-topics.sh --alter --partition partitionNum --topic topic" to alter the partition count of the given topic. If the given undeleted_partition_count is smaller than both the current partition count and the current undeleted_partition_count of the given topic, the script writes the given undeleted_partition_count in the topic znode. This triggers the topic znode listener in controller.

- For those partitions P1 whose partitionId >= undeleted_partition_count, if the leaderEpochBeforeDeletion in the partition znode empty or it is not specified, controller updates partition znode of P1 such that for every undeleted partitions P2, leaderEpochBeforeDeletion maps the partition P2 to its current leaderEpoch. Controller also increments the leaderEpoch by 1 for partition znode of every P2. Then controller sends LeaderAndIsrRequest with the updated undeleted_partition_count for this topic and the updated leaderEpoch for P2.

- Controller propagates the UpdateMetadataRequest with the latest undeleted_partition_count per topic.

- Controlelr schedules a task that periodically which periodically sens ListOffsetRequest to check the size of those partitions which have been marked for deletion, i.e. with non-empty leaderEpochBeforeDeletion map in the parittion znode. If the size of a yet-to-be-deleted partition is 0, i.e. its logStartOffset == logEndOffset, controller sends StopReplicaRequest with delete=true, removes the partition from the assignment in the topic znode, and removes the partition znode.

 

Note that this procedure is fault tolerant. If controller fails in any of these step, the new controller can continue read the undeleted_partition_count from the topic znode, update partition znode and sends LeaderAndIsrRequest.

The motivation of this change is that, for each undeleted partition after the partition deletion, there exists a leaderEpoch such that all messages up to this leaderEpoch are produced with the producers using the old undeleted_partition_count prior to the partition deletion. If the consumer group has delivered all messages in a partition that has been marked for deletion, it should be safe for this consumer group to consume any message in undeleted partition. If the consumer group has not delivered some messages in a partition that has been marked for deletion, it should still be safe for this consumer group to delivery messages up to the corresponding leaderEpoch in the undeleted partitions.

 

3) Changes in how broker handles ProduceRequest

- When broker receives LeaderAndIsrRequest, in addition to the existing procedure (e.g. updating the leaderEpochCache for the new leaderEpoch), the broker should record in memory the undeleted_partition_count for each topic.

- When broker receives ProduceRequest, for each partition in the request, broker checks whether its undeleted_partition_count equals the undeleted_partition_count from the most recent LeaderAndIsrRequest. If yes, broker handles the produce request in the current way. If no, broker rejects this partition with InvalidPartitionMetadataException. This error extends InvalidMetadaException and should trigger producer to update its metadata and retry.

The motivation of this change is that all messags after the incremented leaderEpoch (read from the LeaderAndIsrRequest) will be produced by producers using the latest undeleted_partition_count (also read from the same LeaderAndIsrRequest). Note that the leaderEpoch and the undeleted_partition_count are updated atomically using the same LeaderAndIsrRequest.


4) Changes in how producer constructs ProduceRequest

- Producer should include the undeleted_partition_count for each topic in the ProduceRequest.

- Producer should determins the partition for both keyed and unkeyed message using undeleted_partition_count as the partition count.

- Producer will update metadata and retry ProduceRequest if ProduceResponse shows InvalidPartitionMetadataException, which happens if producer's undeleted_partition_count is different (maybe newer or older) than the undeleted_partition_count in the broker.

The motivation of this change is that producer always use the undeleted_partition_count in the leader broker to determine the partition of each message.


5) Changes in the leader of the consumer group

- Leader of the consumer group query the metadata to split the partition list to those partitions that have not been marked for deletion and those partitions that have been marked for deletion. It should apply the user-defined assignment algorithm to these two lists seperately to determine the partition distribution across consumers in the group, so that partitions which have not been marked for deletion can also be evenly distributed across consumers. This is to prevent load imbalance across consumers because there will be no new data to those partitions which have been marked for deletion.

The motivation of this change is to evenly distribute the byte-in-rate of undeleted partitions across consumers of the consumer group. Note that even though there is still remaning data in the partitions which have been marked for deletion, there is no new data into these partitions. Thus a consumer will likely be under-utilized if it is assigned few undeleted partitions but many partitons which have been marked for deletion.

 

6) Changes in how consumer handles HeartbeatRequest and HeartbeatResponse

- HeartbeatRequest includes the current position for each partition requested by the coordinator from the previous HeartbeatResponse. It also includes the list of partitions for which it wants to know the position (of the consumer that is consuming this partitoin).

- Group coordinator keeps a map that maps the partition to the the list of consumer ids that are interested in the position of this partition. Group coordinator also remembers the positions for those partitions which are interesting to some consumers of the given group. Both information may be updated when coordinator receives HeartbeatRequest.

- HeartbeatResponse includes the list of partitions that this consumer is interested in. For each partition P1 that is assigned to this consumer AND whose position is interesting to some consumers of the consumer group, if the following following condition is satisfied , the consumer should execute the callback PartitionKeyRebalanceListener.onPartitionKeyMaybeRevoked(Collection<TopicPartition>) and include the position of the partition P1 in the HeartbeatResponse.

Here is the condition for including partition P1 assuming its current position is Pos. This condition optimizes the overhead of onPartitionKeyMaybeRevoked() callback such that consumer will not trigger callback and will not include the position of the partition in the HeartbeatResponse when it knows for sure that this position will not unblock other consumers from starting to consume some partitions.

Code Block
(The position of partition P1 has not been included in HeartbeatResponse since this consumer starts) ||
(The position of partition P1 has increased since the last HeartbeatResponse AND it equals the high watermark of P1 in the last FetchReponse) ||
(The largest leaderEpoch of those messages before position P1 has increased since the last HeartbeatResponse)

 

The motivation of this change is to allow consumers of the same consumer group to communicate the position of partitions with each other. This allows a consumer to start consuming beyond a given offset of a partition only after a certain condition is satisfied for the position of other partitions in the consumer group.

 

7) Changes in how consumer consumes partition

1. Consumer receives SyncGroupResponse, which contains its assigned partitions

2. Consumer gets the startPosition, i.e.the committedOffset, for its assigned partitions.

3. Consumer sends ListOffsetRequest to get the earliest offset for its assigned partitions.

4. For each partition P1 whose startPosition is not available, or whose startPosition equals the earliest offset, consumer does the following before consuming the partition P1:

    4.1 Consumer sends PartitionLeaderEpochsForPartitionsRequest to the coordinator to get the leaderEpochAfterCreation map for the partition P1, which can be read by broker from the corresponding partition znode. Then the consumer sends OffsetsForLeaderEpochRequest to convert the leaderEpochAfterCreation from (priorPartition -> oldLeaderEpoch) to (priorPartition -> lowerOffsetThreshold), where lowerOffsetThreshold should be the last offset of messages published under the oldLeaderEpoch for the given priorPartition.

    4.2 Consumer includes the keys (i.e. partitions) of the leaderEpochAfterCreation map in the HeartbeatRequest and gets the corresponding position of these partitions of the consumer group in the HeartbeatResponse. If for all its priorPartition, the position > lowerOffsetThreshold of the priorParition, then the consumer executes the callback PartitionKeyRebalanceListener.onPartitionKeyAssigned(Collection<TopicPartition> partitions) and starts to consume partition P1.

5. For each parrtition P1 assigned to this consumer, consumer queries the metadata to see if there any partition of this topic has been marked for deletion. If so, consumer does the following before deliverying a message with offset T from this partition P1:

    5.1 Consumer sends PartitionLeaderEpochsForPartitionsRequest to coordinator to get the leaderEpochBeforeDeletion map for all partitions of this topic. Note that a partition is marked for deletion if and only if the leaderEpochBeforeDeletion map in its partition znode is not empty. For each partition P2 whose leaderEpochBeforeDeletion includes the partition P1, consumer then sends OffsetsForLeaderEpochRequest to convert the leaderEpoch (i.e. P2.leaderEpochBeforeDeletion[P1]) to upperOffsetThreshold. This results in a map from P2 -> upperOffsetThreshold, where P2 represents all those partitions whose leaderEpochBeforeDeletion includes the partition P1.

    5.2 For all partitions whose leaderEpochBeforeDeletion includes the partition P1, consumer includes these partitions in the HeartbeatRequest and gets the corresponding position of these partitions of the consumer group in the HeartbeatResponse. Consumer also sends ListOffsetRequest to get the LogEndOffset for these partitons.

    5.3 For every partition P2 whose leaderEpochBeforeDeletion includes the partition P1, if either the position of P2 of the consumer group has reached the LogEndOffset of P2 or the offset T <= upperOffsetThreshold, then the consumer will execute the callback PartitionKeyRebalanceListener.onPartitionKeyAssigned(Collection<TopicPartition> partitions) and start to consume messages with offset >= T from partition P1.


This change, combined with the controller change 1) and 2) described above, ensures in-order message delivery for keyed messages in the presence of partition creation and deletion. 

 


...