...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "apiKey": 5, "type": "request", "name": "StopReplicaRequest", // Version 1 adds the broker epoch and reorganizes the partitions to be stored // per topic. // // Version 2 is the first flexible version. // // Version 3 reorganizes the partitions to be stored, adds the leader epoch // and the delete partition fields per partition (KIP-570). "validVersions": "0-3", "flexibleVersions": "2+", "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The controller id." }, { "name": "ControllerEpoch", "type": "int32", "versions": "0+", "about": "The controller epoch." }, { "name": "BrokerEpoch", "type": "int64", "versions": "1+", "default": "-1", "ignorable": true, "about": "The broker epoch." }, { "name": "DeletePartitions", "type": "bool", "versions": "0+-2", "about": "Whether these partitions should be deleted." }, { "name": "UngroupedPartitions", "type": "[]StopReplicaPartitionV0", "versions": "0", "about": "The partitions to stop.", "fields": [ { "name": "TopicName", "type": "string", "versions": "0", "entityType": "topicName", "about": "The topic name." }, { "name": "PartitionIndex", "type": "int32", "versions": "0", "about": "The partition index." } ]}, { "name": "Topics", "type": "[]StopReplicaTopicStopReplicaTopicV1", "versions": "1+-2", "about": "The topics to stop.", "fields": [ { "name": "Name", "type": "string", "versions": "1+-2", "entityType": "topicName", "about": "The topic name." }, { "name": "PartitionIndexes", "type": "[]int32", "versions": "1+-2", "about": "The partition indexes." } ]}, // New Field Structure from V3 on { "name": "TopicStates", "type": "[]StopReplicaTopicState", "versions": "3+", "about": "Each topic.", "fields": [ { "name": "TopicName", "type": "string", "versions": "3+", "entityType": "topicName", "about": "The topic name." }, { "name": "PartitionStates", "type": "[]StopReplicaPartitionState", "versions": "3+", "about": "The state of each partition", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "3+", "about": "The partition index." }, { "name": "LeaderEpoch", "type": "int32", "versions": "3+", "default": "-1", "about": "The leader epoch." }, { "name": "DeletePartition", "type": "bool", "versions": "3+", "about": "Whether this partition should be deleted." } ]} ]} ] } |
Proposed Changes
The controller will include the leader epoch of each partition when sending out an StopReplicaRequest. The broker will verify the epoch of each partitions and send an `STALE`FENCED_CONTROLLERLEADER_EPOCH` error when the leader epoch received is older than the known one. `STALE_CONTROLLER_EPOCH` is used to remain consistent with the LeaderAndIsr API. When a topic is deleted, the leader epoch is not bumped. In this case, we will send a sentinel (-2) which overrides any existing epoch. Older version of the request will use a sentinel (-1) to indicate the the leader epoch is not present when the controller is still on the old version during the upgrade.
...